ratchjob 0.2.1

一个rust实现的分布式任务调度平台服务。计划完全兼容xxl-job协议,然后再增强一些任务调度平台能力。
Documentation
pub mod actor_model;
pub mod finish_mark;

use crate::common::cron_utils::CronUtil;
use crate::job::model::enum_type::ScheduleType;
use crate::job::model::job::JobInfo;
use chrono::{DateTime, TimeZone};
use cron::Schedule;
use std::str::FromStr;
use std::sync::Arc;

#[derive(Clone, Debug)]
pub struct JobRunState {
    pub id: u64,
    pub schedule_type: ScheduleType,
    pub cron_value: Arc<String>,
    pub cron_schedule: Option<Schedule>,
    pub delay_second: u32,
    pub interval_second: u32,
    pub pre_trigger_time: u32,
    pub next_trigger_time: u32,
    pub last_finish_time: u32,
    pub next_active: bool,
    pub version: u32,
    pub route_value: u32,
    pub source_job: Arc<JobInfo>,
    /// 已标记下次触发任务
    pub marked_delay_trigger: bool,
}

impl JobRunState {
    pub fn new(source_job: Arc<JobInfo>) -> Self {
        let cron_schedule = Schedule::from_str(source_job.cron_value.as_str()).ok();
        JobRunState {
            id: source_job.id,
            schedule_type: source_job.schedule_type.clone(),
            cron_value: source_job.cron_value.clone(),
            cron_schedule,
            delay_second: source_job.delay_second,
            interval_second: source_job.interval_second,
            pre_trigger_time: 0,
            last_finish_time: 0,
            next_trigger_time: 0,
            next_active: false,
            version: 0,
            route_value: 0,
            source_job,
            marked_delay_trigger: false,
        }
    }
    pub fn calculate_first_trigger_time<T: TimeZone>(&self, datetime: &DateTime<T>) -> u32 {
        match self.schedule_type {
            ScheduleType::Delay => {
                let timestamp_seconds = datetime.timestamp() as u32;
                std::cmp::max(self.last_finish_time + self.delay_second, timestamp_seconds)
            }
            ScheduleType::None => 0,
            _ => self.calculate_next_trigger_time(datetime),
        }
    }

    pub fn update_job(&mut self, source_job: Arc<JobInfo>) -> bool {
        let mut change_schedule = false;
        if self.schedule_type != source_job.schedule_type {
            change_schedule = true;
            self.schedule_type = source_job.schedule_type.clone();
        }
        if self.cron_value.as_str() != source_job.cron_value.as_str() {
            change_schedule = true;
            self.cron_value = source_job.cron_value.clone();
            self.cron_schedule = Schedule::from_str(source_job.cron_value.as_str()).ok();
        }
        if self.interval_second != source_job.interval_second {
            change_schedule = true;
            self.interval_second = source_job.interval_second;
        }
        if self.delay_second != source_job.delay_second {
            change_schedule = true;
            self.delay_second = source_job.delay_second;
        }
        self.source_job = source_job;
        if change_schedule {
            if self.version == u32::MAX {
                self.version = 0;
            } else {
                self.version += 1;
            }
        }
        change_schedule
    }

    pub fn finish_job(&mut self, finish_time: u32) {
        self.last_finish_time = finish_time;
        if self.schedule_type == ScheduleType::Delay {
            self.next_active = true;
        }
    }

    pub fn calculate_next_trigger_time<T: TimeZone>(&self, datetime: &DateTime<T>) -> u32 {
        let mut result = 0;
        let timestamp_seconds = datetime.timestamp() as u32;
        match self.schedule_type {
            ScheduleType::Cron => {
                if let Some(cron_schedule) = self.cron_schedule.as_ref() {
                    if let Ok(value) = CronUtil::next_cron_time(cron_schedule, datetime) {
                        result = value;
                    }
                }
            }
            ScheduleType::Interval => {
                let interval_second = std::cmp::max(1, self.interval_second);
                let remainder = ((timestamp_seconds as i32) - (self.pre_trigger_time as i32))
                    .rem_euclid(interval_second as i32);
                result = (timestamp_seconds as i32 - remainder) as u32 + interval_second;
            }
            ScheduleType::Delay => {
                if self.next_active {
                    result =
                        std::cmp::max(self.last_finish_time + self.delay_second, timestamp_seconds);
                }
            }
            ScheduleType::None => {}
        }
        result
    }
}

#[derive(Clone, Debug)]
pub struct TriggerInfo {
    pub job_id: u64,
    pub trigger_time: u32,
    pub version: u32,
}

impl TriggerInfo {
    pub fn new(job_id: u64, trigger_time: u32, version: u32) -> Self {
        TriggerInfo {
            job_id,
            trigger_time,
            version,
        }
    }
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RedoType {
    Retry,
    Timeout,
    Redo,
}

#[derive(Clone, Debug)]
pub struct RedoInfo {
    pub task_id: u64,
    pub redo_type: RedoType,
}

impl RedoInfo {
    pub fn new(task_id: u64, redo_type: RedoType) -> Self {
        RedoInfo { task_id, redo_type }
    }
}

#[derive(Clone, Debug)]
pub struct DelayFinishTasks {
    pub success_tasks: Vec<u64>,
    pub fail_tasks: Vec<u64>,
}

impl DelayFinishTasks {
    pub fn new() -> Self {
        DelayFinishTasks {
            success_tasks: vec![],
            fail_tasks: vec![],
        }
    }

    pub fn add_task(&mut self, task_id: u64, task_result: bool) {
        if task_result {
            self.success_tasks.push(task_id);
        } else {
            self.fail_tasks.push(task_id);
        }
    }

    pub fn add_success_task(&mut self, task_id: u64) {
        self.success_tasks.push(task_id);
    }

    pub fn add_fail_task(&mut self, task_id: u64) {
        self.fail_tasks.push(task_id);
    }

    pub fn is_empty(&self) -> bool {
        self.success_tasks.is_empty() && self.fail_tasks.is_empty()
    }
}