1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use std::collections::HashMap;
use chrono::Utc;
use redis::{Commands, Client, Connection};
use saffron::Cron;
use crate::error::Error;
use crate::{Executable, Queue, Result};
use crate::scheduled_job::ScheduledJob;
pub struct Scheduler {
jobs: HashMap<String, ScheduledJob>,
queue_name: String,
redis_client: Client,
redis_connection: Connection,
}
impl Scheduler {
pub fn new(qname: &str, client: Client) -> Result<Self> {
Ok(Scheduler { jobs: HashMap::new(), queue_name: qname.to_string(), redis_connection: client.get_connection()?, redis_client: client})
}
pub fn register_job<F>(&mut self, name: &str, schedule: &str, create_instance: F) -> Result<()> where F: Fn() -> Box<dyn Executable> + 'static + Sync {
let cron: Cron = schedule.parse().map_err(Error::CronParsingError)?;
self.jobs.insert(name.to_string(), ScheduledJob{schedule: cron, create_instance: Box::new(create_instance)});
let score: Option<u64> = self.redis_connection.zscore(self.scheduled_jobs_key(), name)?;
if score.is_none() {
self.job_succeeded(name)?;
}
Ok(())
}
pub fn tick(&mut self) -> Result<()> {
let current_timestamp = chrono::Utc::now().timestamp();
let job_names = self.get_due_job_names(current_timestamp)?;
let mut queue = Queue::from_client(&self.queue_name, self.redis_client.clone())?;
for name in job_names {
let job = self.jobs.get(&name).expect("We did not know about a scheduled job");
queue.enqueue(&name, (job.create_instance)())?;
}
self.redis_connection.zrembyscore(self.scheduled_jobs_key(), 0, current_timestamp)?;
Ok(())
}
fn get_due_job_names(&mut self, timestamp: i64) -> Result<Vec<String>> {
Ok(self.redis_connection.zrangebyscore(self.scheduled_jobs_key(), 0, timestamp)?)
}
fn scheduled_jobs_key(&self) -> String {
format!("dil.{}.scheduled", self.queue_name)
}
pub fn job_succeeded(&mut self, name: &str) -> Result<()> {
if let Some(job) = self.jobs.get(name) {
if let Some(next_datetime) = job.schedule.next_after(Utc::now()) {
self.redis_connection.zadd(self.scheduled_jobs_key(), name, next_datetime.timestamp())?;
}
}
Ok(())
}
}