use async_trait::async_trait;
use chrono::{DateTime, Local};
use cron::Schedule;
use dashmap::DashMap;
use parking_lot::RwLock;
use serde_json::Value;
use std::str::FromStr;
use std::sync::Arc;
use crate::errors::{SchedulerError, SchedulerErrorKind};
use crate::job::{JobContext, ScheduleJob, WillExecuteJobFuture};
#[async_trait]
pub trait JobStorage<Tz>: Send + Sync
where
Tz: chrono::TimeZone,
Tz::Offset: Send + Sync,
{
async fn register_job(&self, job: Box<dyn ScheduleJob>) -> Result<(), SchedulerError>;
async fn add_job(
&self,
job_name: &str,
cron: &str,
args: &Option<Value>,
) -> Result<String, SchedulerError>;
async fn add_retry_job(
&self,
origin_id: &str,
job_name: &str,
args: &Option<Value>,
retry_times: u64,
) -> Result<String, SchedulerError>;
async fn delete_job(&self, id: &str) -> Result<(), SchedulerError>;
async fn has_job(&self, id: &str) -> Result<bool, SchedulerError>;
async fn get_all_should_execute_jobs(
&self,
) -> Result<Vec<(String, String, Option<Value>, u64)>, SchedulerError>;
async fn restore_jobs(&self) -> Result<(), SchedulerError>;
async fn get_job_by_name(
&self,
name: &str,
id: &str,
args: &Option<Value>,
retry_times: u64,
) -> Result<Option<WillExecuteJobFuture>, SchedulerError>;
async fn get_jobs_by_name(
&self,
exprs: &Vec<(String, String, Option<Value>, u64)>,
) -> Result<Vec<(WillExecuteJobFuture, String, String, Option<Value>)>, SchedulerError>;
}
pub struct MemoryJobStorage<Tz = chrono::Utc>
where
Tz: chrono::TimeZone + Sync + Send,
{
tasks: DashMap<String, Box<dyn ScheduleJob>>,
jobs: DashMap<String, (Schedule, String, Option<Value>)>,
retry_jobs: DashMap<String, (String, String, Option<Value>, u64)>,
timezone: Tz,
last_check_time: Arc<RwLock<DateTime<Tz>>>,
}
impl<Tz> MemoryJobStorage<Tz>
where
Tz: chrono::TimeZone + Send + Sync,
{
pub fn new(timezone: Tz) -> Self {
Self {
tasks: DashMap::new(),
jobs: DashMap::new(),
retry_jobs: DashMap::new(),
timezone: timezone.to_owned(),
last_check_time: Arc::new(RwLock::new(Local::now().with_timezone(&timezone))),
}
}
pub fn with_timezone(mut self, tz: Tz) -> Self {
self.timezone = tz;
self
}
}
impl Default for MemoryJobStorage<chrono::Utc> {
fn default() -> Self {
Self {
tasks: DashMap::new(),
jobs: DashMap::new(),
retry_jobs: DashMap::new(),
timezone: chrono::Utc,
last_check_time: Arc::new(RwLock::new(Local::now().with_timezone(&chrono::Utc))),
}
}
}
unsafe impl<Tz: chrono::TimeZone + Sync + Send> Send for MemoryJobStorage<Tz> {}
unsafe impl<Tz: chrono::TimeZone + Sync + Send> Sync for MemoryJobStorage<Tz> {}
#[async_trait]
impl<Tz> JobStorage<Tz> for MemoryJobStorage<Tz>
where
Tz: chrono::TimeZone + Sync + Send,
Tz::Offset: Send + Sync,
{
async fn register_job(&self, job: Box<dyn ScheduleJob>) -> Result<(), SchedulerError> {
let is_registered = self.tasks.get(&job.get_job_name()).is_some();
if is_registered {
return Err(SchedulerError::new(SchedulerErrorKind::JobRegistered));
}
self.tasks.insert(job.get_job_name(), job);
Ok(())
}
async fn add_job(
&self,
job_name: &str,
cron: &str,
args: &Option<Value>,
) -> Result<String, SchedulerError> {
let cron = Schedule::from_str(&cron)
.map_err(|_| SchedulerError::new(SchedulerErrorKind::CronInvalid))?;
let id = uuid::Uuid::new_v4().to_string();
self.jobs
.insert(id.to_owned(), (cron, job_name.to_owned(), args.to_owned()));
Ok(id)
}
async fn add_retry_job(
&self,
origin_id: &str,
job_name: &str,
args: &Option<Value>,
retry_times: u64,
) -> Result<String, SchedulerError> {
let id = uuid::Uuid::new_v4().to_string();
self.retry_jobs.insert(
id.to_owned(),
(
origin_id.to_owned(),
job_name.to_owned(),
args.to_owned(),
retry_times,
),
);
Ok(id)
}
async fn delete_job(&self, id: &str) -> Result<(), SchedulerError> {
self.jobs.remove(id);
Ok(())
}
async fn has_job(&self, id: &str) -> Result<bool, SchedulerError> {
let has = self.jobs.contains_key(id);
Ok(has)
}
async fn get_all_should_execute_jobs(
&self,
) -> Result<Vec<(String, String, Option<Value>, u64)>, SchedulerError> {
let time_now = Local::now().with_timezone(&self.timezone);
let last_check_at = self.last_check_time.read();
let cron_and_name: Vec<(Schedule, String, Option<Value>, String)> = self
.jobs
.iter()
.map(|v| {
(
v.value().0.to_owned(),
v.value().1.to_owned(),
v.value().2.to_owned(),
v.key().to_owned(),
)
})
.collect();
let mut result_vec = vec![];
for (cron, name, args, id) in cron_and_name {
for time in cron.after(&last_check_at) {
if time <= time_now {
result_vec.push((name.to_owned(), id.to_owned(), args.to_owned(), 0_u64))
} else {
break;
}
}
}
drop(last_check_at);
*self.last_check_time.write() = time_now;
let mut all_should_retry_jobs = self
.retry_jobs
.iter()
.map(|v| {
(
v.value().1.to_owned(),
v.value().0.to_owned(),
v.value().2.to_owned(),
v.value().3,
)
})
.collect();
result_vec.append(&mut all_should_retry_jobs);
self.retry_jobs.clear();
Ok(result_vec)
}
async fn restore_jobs(&self) -> Result<(), SchedulerError> {
Ok(())
}
async fn get_job_by_name(
&self,
name: &str,
id: &str,
args: &Option<Value>,
retry_times: u64,
) -> Result<Option<WillExecuteJobFuture>, SchedulerError> {
if let Some(task) = self.tasks.get(name) {
let job_context = JobContext::new(id.to_owned(), args.to_owned(), retry_times);
Ok(Some(WillExecuteJobFuture::new(
task.execute(job_context.to_owned()),
job_context,
)))
} else {
Ok(None)
}
}
async fn get_jobs_by_name(
&self,
exprs: &Vec<(String, String, Option<Value>, u64)>,
) -> Result<Vec<(WillExecuteJobFuture, String, String, Option<Value>)>, SchedulerError> {
let mut result = vec![];
for (name, id, args, retry_times) in exprs {
let job = match self.get_job_by_name(&name, &id, args, *retry_times).await? {
Some(j) => j,
None => continue,
};
result.push((job, name.to_owned(), id.to_owned(), args.to_owned()));
}
Ok(result)
}
}