ratchjob 0.2.1

一个rust实现的分布式任务调度平台服务。计划完全兼容xxl-job协议,然后再增强一些任务调度平台能力。
Documentation
use crate::common::constant::EMPTY_ARC_STR;
use crate::common::pb::data_object::{JobTaskDo, TaskTryLogDo};
use crate::job::model::job::JobInfo;
use crate::task::model::actor_model::{TriggerItem, TriggerSourceInfo, TriggerSourceType};
use crate::task::model::app_instance::InstanceAddrSelectResult;
use crate::task::model::enum_type::TaskStatusType;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::sync::Arc;

#[derive(Debug, Default, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskTryLog {
    pub execution_time: u32,
    pub addr: Arc<String>,
}

#[derive(Debug, Default, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct JobTaskInfo {
    pub task_id: u64,
    pub job_id: u64,
    pub trigger_time: u32,
    pub instance_addr: Arc<String>,
    pub trigger_message: Arc<String>,
    pub status: TaskStatusType,
    pub finish_time: u32,
    pub callback_message: Arc<String>,
    pub execution_time: u32,
    pub trigger_from: Arc<String>,
    pub try_times: u32,
    pub try_logs: Vec<TaskTryLog>,
    pub retry_interval: u32,
    pub retry_count: u32,
    pub timeout_second: u32,
    #[serde(default)]
    pub from_outside: bool,
    #[serde(default)]
    pub trigger_user: Arc<String>,
    #[serde(default)]
    pub namespace: Arc<String>,
    #[serde(default)]
    pub app_name: Arc<String>,
}

impl JobTaskInfo {
    pub fn from_job_trigger(trigger_item: &TriggerItem) -> Self {
        let (from_outside, trigger_user) = match &trigger_item.trigger_source.source_type {
            TriggerSourceType::System => (false, EMPTY_ARC_STR.clone()),
            TriggerSourceType::User(trigger_user) => (true, trigger_user.clone()),
        };
        JobTaskInfo {
            task_id: 0,
            job_id: trigger_item.job_info.id,
            trigger_time: trigger_item.trigger_time,
            instance_addr: EMPTY_ARC_STR.clone(),
            trigger_message: EMPTY_ARC_STR.clone(),
            status: TaskStatusType::Init,
            finish_time: 0,
            callback_message: EMPTY_ARC_STR.clone(),
            execution_time: 0,
            trigger_from: EMPTY_ARC_STR.clone(),
            try_times: trigger_item.job_info.try_times,
            try_logs: vec![],
            retry_interval: trigger_item.job_info.retry_interval,
            retry_count: 0,
            timeout_second: trigger_item.job_info.timeout_second,
            from_outside,
            trigger_user,
            namespace: trigger_item.job_info.namespace.clone(),
            app_name: trigger_item.job_info.app_name.clone(),
        }
    }

    pub fn can_retry(&self) -> bool {
        self.try_times > self.retry_count
    }

    pub fn push_next_try(&mut self) {
        self.retry_count += 1;
        self.try_logs.push(TaskTryLog {
            execution_time: self.execution_time,
            addr: self.instance_addr.clone(),
        });
        self.execution_time = 0;
        self.instance_addr = EMPTY_ARC_STR.clone();
        self.status = TaskStatusType::Running;
    }

    pub fn get_retry_interval(&self) -> u32 {
        if self.retry_interval > 0 {
            self.retry_interval
        } else {
            //默认间隔10秒
            10
        }
    }

    pub fn get_timeout_second(&self, default_value: u32) -> u32 {
        if self.timeout_second > 0 {
            self.timeout_second
        } else {
            default_value
        }
    }

    pub fn to_do(&self) -> JobTaskDo<'_> {
        JobTaskDo {
            task_id: self.task_id,
            job_id: self.job_id,
            trigger_time: self.trigger_time,
            instance_addr: Cow::Borrowed(&self.instance_addr),
            trigger_message: Cow::Borrowed(&self.trigger_message),
            status: Cow::Borrowed(self.status.to_str()),
            finish_time: self.finish_time,
            callback_message: Cow::Borrowed(&self.callback_message),
            execution_time: self.execution_time,
            trigger_from: Cow::Borrowed(&self.trigger_from),
            try_times: self.try_times,
            try_logs: self
                .try_logs
                .iter()
                .map(|log| TaskTryLogDo {
                    execution_time: log.execution_time,
                    addr: Cow::Borrowed(&log.addr),
                })
                .collect(),
            retry_interval: self.retry_interval,
            retry_count: self.retry_count,
            timeout_second: self.timeout_second,
            from_outside: self.from_outside,
            trigger_user: Cow::Borrowed(&self.trigger_user),
            namespace: Cow::Borrowed(&self.namespace),
            app_name: Cow::Borrowed(&self.app_name),
        }
    }
}

impl<'a> From<JobTaskDo<'a>> for JobTaskInfo {
    fn from(task_do: JobTaskDo<'a>) -> Self {
        JobTaskInfo {
            task_id: task_do.task_id,
            job_id: task_do.job_id,
            trigger_time: task_do.trigger_time,
            instance_addr: Arc::new(task_do.instance_addr.to_string()),
            trigger_message: Arc::new(task_do.trigger_message.to_string()),
            status: TaskStatusType::from_str(&task_do.status),
            finish_time: task_do.finish_time,
            callback_message: Arc::new(task_do.callback_message.to_string()),
            execution_time: task_do.execution_time,
            trigger_from: Arc::new(task_do.trigger_from.to_string()),
            try_times: task_do.try_times,
            try_logs: task_do
                .try_logs
                .iter()
                .map(|log| TaskTryLog {
                    execution_time: log.execution_time,
                    addr: Arc::new(log.addr.to_string()),
                })
                .collect(),
            retry_interval: task_do.retry_interval,
            retry_count: task_do.retry_count,
            timeout_second: task_do.timeout_second,
            from_outside: task_do.from_outside,
            trigger_user: Arc::new(task_do.trigger_user.to_string()),
            namespace: Arc::new(task_do.namespace.to_string()),
            app_name: Arc::new(task_do.app_name.to_string()),
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskCallBackParam {
    pub task_id: u64,
    pub task_date_time: i64,
    pub success: bool,
    pub handle_msg: Option<Arc<String>>,
}

pub struct TaskWrap {
    pub task: JobTaskInfo,
    pub job_info: Arc<JobInfo>,
    pub select_result: InstanceAddrSelectResult,
    pub app_addrs: Arc<Vec<Arc<String>>>,
    pub trigger_source: TriggerSourceInfo,
}

#[derive(Debug, Clone, Default)]
pub struct UpdateTaskMetricsInfo {
    pub success_count: u64,
    pub fail_count: u64,
}

impl UpdateTaskMetricsInfo {
    pub fn add(&mut self, task_info: &UpdateTaskMetricsInfo) {
        self.success_count += task_info.success_count;
        self.fail_count += task_info.fail_count;
    }
}