pub mod executable;
pub mod job_context;
pub mod retry;
pub use executable::*;
pub use job_context::*;
pub use retry::*;
use chrono::Duration;
use chrono::{DateTime, Utc};
use cron::Schedule;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::str::FromStr;
use uuid::Uuid;
use crate::util::get_now;
use crate::PluginCenter;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Job<M: Executable + Clone> {
pub context: JobContext,
pub data: M,
}
impl<M> Job<M>
where
M: Executable + Clone,
{
pub fn new(job: M) -> Job<M> {
let id = Uuid::new_v4().to_string();
Self {
context: JobContext::new(id),
data: job,
}
}
pub fn id(&self) -> &str {
&self.context.job_id
}
pub fn set_id(mut self, id: String) -> Self {
self.context.job_id = id;
self
}
pub fn set_context(mut self, context: JobContext) -> Self {
self.context = context;
self
}
pub fn context(self, context: JobContext) -> Self {
self.set_context(context)
}
pub fn set_job_type(mut self, job_type: JobType) -> Self {
self.context.job_type = job_type;
self
}
pub fn job_type(self, job_type: JobType) -> Self {
self.set_job_type(job_type)
}
pub fn schedule_at(self, schedule_at: DateTime<Utc>) -> Self {
self.job_type(JobType::new_schedule(schedule_at))
}
pub fn delay(self, after: Duration) -> Self {
let schedule_at = get_now() + after;
self.schedule_at(schedule_at)
}
pub fn cron(self, cron_expression: &str) -> Self {
let cron = JobType::new_cron(cron_expression, CronContext::default()).unwrap();
self.job_type(cron)
}
pub fn set_retry(mut self, retry: Retry) -> Self {
self.context.retry = Some(retry);
self
}
pub fn retry(self, retry: Retry) -> Self {
self.set_retry(retry)
}
}
impl<M> Job<M>
where
M: Executable + Clone + Serialize + Sync + Send + 'static,
{
pub(crate) async fn execute(&mut self) -> <M as Executable>::Output {
PluginCenter::before::<M>(self.id().to_string()).await;
self.context.run_count += 1;
self.data.pre_execute(&self.context).await;
let output = self.data.execute(&self.context).await;
let output = self.data.post_execute(output, &self.context).await;
PluginCenter::after::<M>(self.id().to_string()).await;
output
}
pub fn is_ready(&self) -> bool {
let now = get_now();
match &self.context.job_type {
JobType::ScheduledAt(schedule_at) => &now > schedule_at,
JobType::Cron(_, next_slot, total_repeat, context) => {
if now < *next_slot {
return false;
}
if let Some(max_repeat) = context.max_repeat {
if max_repeat < *total_repeat {
return false;
}
}
if let Some(end_at) = context.end_at {
if now > end_at {
return false;
}
}
true
}
_ => true,
}
}
pub(crate) fn next_tick(&mut self) -> Option<Self> {
let now = get_now();
match &self.context.job_type {
JobType::Cron(cron_expression, _, total_repeat, context) => {
let mut job = self.clone();
let schedule = Schedule::from_str(cron_expression);
if schedule.is_err() {
error!(
"[Job] Cannot parse schedule {cron_expression} of job {}",
job.id(),
);
return None;
}
let schedule = schedule.unwrap();
if let Some(upcoming_event) = schedule.after(&get_now()).next() {
job.context.job_type = JobType::Cron(
cron_expression.clone(),
upcoming_event,
*total_repeat + 1,
context.clone(),
);
}
if let Some(max_repeat) = context.max_repeat {
if max_repeat < *total_repeat {
return None;
}
}
if let Some(end_at) = context.end_at {
if end_at < now {
return None;
}
}
Some(job)
}
_ => None,
}
}
pub fn is_queued(&self) -> bool {
self.context.job_status == JobStatus::Queued
}
pub fn is_running(&self) -> bool {
self.context.job_status == JobStatus::Running
}
pub fn is_cancelled(&self) -> bool {
self.context.job_status == JobStatus::Canceled
}
pub(crate) fn is_done(&self) -> bool {
self.context.job_status == JobStatus::Finished
|| self.context.job_status == JobStatus::Canceled
|| self.context.job_status == JobStatus::Failed
}
}
pub trait BackgroundJob: Executable + Clone {
fn queue_name() -> &'static str;
fn job(self) -> Job<Self>;
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use async_trait::async_trait;
use super::*;
#[derive(Default, Debug, Clone, Serialize)]
pub struct TestJob {
number: i32,
}
#[async_trait::async_trait]
impl Executable for TestJob {
type Output = i32;
async fn execute(&mut self, _context: &JobContext) -> Self::Output {
self.number
}
}
fn default_job(number: i32) -> Job<TestJob> {
Job::new(TestJob { number })
}
#[tokio::test]
async fn test_job() {
let number = 1;
let mut default_job = default_job(number);
assert!(default_job.is_ready());
assert!(default_job.context.job_status == JobStatus::Queued);
assert!(default_job.context.job_type == JobType::Normal);
let output = default_job.execute().await;
assert_eq!(output, number);
assert_eq!(default_job.context.run_count, 1);
}
#[tokio::test]
async fn test_schedule_job() {
let number = 1;
let schedule_at = get_now() + Duration::from_secs(1);
let schedule_job = Job::new(TestJob { number }).schedule_at(schedule_at);
assert!(!schedule_job.is_ready());
assert!(schedule_job.context.job_type == JobType::ScheduledAt(schedule_at))
}
#[tokio::test]
async fn test_cron_job() {
let number = 1;
let expression = "0 1 1 1 * * *";
let schedule_job = Job::new(TestJob { number }).cron(expression);
assert!(!schedule_job.is_ready());
let expected_cron = JobType::new_cron(expression, CronContext::default()).unwrap();
assert!(schedule_job.context.job_type == expected_cron);
}
#[tokio::test]
async fn test_retry() {
#[derive(Default, Debug, Clone, Serialize)]
pub struct TestRetryJob {
number: i32,
}
#[async_trait]
impl Executable for TestRetryJob {
type Output = i32;
async fn execute(&mut self, _: &JobContext) -> Self::Output {
self.number
}
async fn is_failed_output(&self, output: &Self::Output) -> bool {
output % 2 == 0
}
}
let max_retries = 3;
let retry = Retry::new_interval_retry(Some(max_retries), chrono::Duration::seconds(1));
let mut internal_retry_job = Job::new(TestRetryJob { number: 2 }).retry(retry);
for _ in 1..=max_retries {
let output = internal_retry_job.execute().await;
assert!(internal_retry_job.data.is_failed_output(&output).await);
let should_retry_at = internal_retry_job
.data
.retry_at(internal_retry_job.context.retry.as_mut().unwrap(), output)
.await;
assert!(should_retry_at.is_some());
}
}
}