1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use crate::{error::Error, job::Job, scheduler::Scheduler, Queue, Result};
use log::{error, info};
use std::env;
use std::{
    sync::atomic::{AtomicBool, Ordering},
    time::{Duration, Instant},
};

pub struct Worker {
    queue: Queue,
    should_exit: AtomicBool,
    scheduler: Option<Scheduler>,
}

impl Worker {
    pub fn new(queue_name: &str, redis_connection_string: &str) -> Result<Self> {
        Ok(Self {
            queue: Queue::new(queue_name, redis_connection_string)?,
            should_exit: AtomicBool::new(false),
            scheduler: None,
        })
    }

    pub fn new_from_env() -> Result<Self> {
        Self::new(
            &env::var("QUEUE_NAME").expect("QUEUE_NAME should be set"),
            &env::var("REDIS_URL").expect("REDIS_URL should be set"),
        )
    }

    fn maybe_process_job(&mut self, timeout: usize) -> Result<Option<Job>> {
        let maybe_job = self.queue.dequeue_executable_job(timeout)?;
        match maybe_job {
            Some(job) => {
                if let Err(e) = job.executable.execute() {
                    Err(Error::JobExecutionError {
                        job_name: job.name,
                        error: e.to_string(),
                    })
                } else {
                    Ok(Some(job))
                }
            }
            None => Ok(None),
        }
    }

    pub fn run(&mut self) -> Result<()> {
        let mut now = Instant::now();
        let mut first = true;
        while !self.should_exit.load(Ordering::SeqCst) {
            if let Some(scheduler) = &mut self.scheduler {
                if first || now.elapsed() > Duration::from_secs(60) {
                    scheduler.tick()?;
                    first = false;
                    now = Instant::now();
                }
            }
            match self.maybe_process_job(10) {
                Ok(None) => continue,
                Ok(Some(job)) => {
                    info!("Successfully executed {}.", job.name);
                    self.queue.delete_job_data(&job.name)?;
                    if let Some(scheduler) = &mut self.scheduler {
                        scheduler.job_succeeded(&job.name)?;
                    }
                }
                Err(Error::JobExecutionError { job_name, error }) => {
                    error!("Failed to execute job {} with error: {}.", job_name, error);
                    self.queue.delete_job_data(&job_name)?;
                }
                Err(e) => return Err(e),
            }
        }
        Ok(())
    }

    pub fn request_shutdown(&self) {
        self.should_exit.store(true, Ordering::SeqCst);
    }

    pub fn create_scheduler(&self) -> Result<Scheduler> {
        Scheduler::new(self.queue.name(), self.queue.get_client_clone())
    }

    pub fn use_scheduler(&mut self, scheduler: Scheduler) {
        self.scheduler = Some(scheduler);
    }
}