use crate::common::app_config::AppConfig;
use crate::common::byte_utils::id_to_bin;
use crate::common::constant::{JOB_TABLE_NAME, JOB_TASK_TABLE_NAME};
use crate::common::datetime_utils::now_millis;
use crate::common::pb::data_object::{JobDo, JobTaskDo};
use crate::job::job_index::JobQueryParam;
use crate::job::model::actor_model::{
JobManagerRaftReq, JobManagerRaftResult, JobManagerReq, JobManagerResult,
};
use crate::job::model::job::{
JobInfo, JobInfoDto, JobKey, JobParam, JobTaskLogQueryParam, JobWrap,
};
use crate::raft::store::model::SnapshotRecordDto;
use crate::raft::store::raftapply::{RaftApplyDataRequest, RaftApplyDataResponse};
use crate::raft::store::raftsnapshot::{SnapshotWriterActor, SnapshotWriterRequest};
use crate::schedule::core::ScheduleManager;
use crate::schedule::model::actor_model::ScheduleManagerReq;
use crate::schedule::model::DelayFinishTasks;
use crate::task::model::task::JobTaskInfo;
use actix::prelude::*;
use bean_factory::{bean, BeanFactory, FactoryData, Inject};
use quick_protobuf::{BytesReader, Writer};
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
#[bean(inject)]
pub struct JobManager {
pub(crate) job_map: BTreeMap<u64, JobWrap>,
schedule_manager: Option<Addr<ScheduleManager>>,
job_key_map: HashMap<JobKey, u64>,
job_task_log_limit: usize,
}
impl JobManager {
pub fn new(config: &Arc<AppConfig>) -> Self {
JobManager {
job_map: BTreeMap::new(),
job_key_map: HashMap::new(),
schedule_manager: None,
job_task_log_limit: config.job_task_log_limit,
}
}
fn create_job(&mut self, job_param: JobParam) -> anyhow::Result<Arc<JobInfo>> {
let id = job_param.id.unwrap_or_default();
if id == 0 {
return Err(anyhow::anyhow!("CreateJob JobParam.id==0 is invalid!"));
}
if self.job_map.contains_key(&id) {
return Err(anyhow::anyhow!(
"CreateJob,The job already exists and is repeatedly created"
));
}
let mut job_info: JobInfo = job_param.into();
job_info.check_valid()?;
let now = now_millis();
job_info.last_modified_millis = now;
job_info.create_time = now;
let value = Arc::new(job_info);
self.job_map.insert(value.id, JobWrap::new(value.clone()));
if !value.key.is_empty() {
let job_key = value.build_job_key();
self.job_key_map.insert(job_key, value.id);
}
if let Some(schedule_manager) = self.schedule_manager.as_ref() {
schedule_manager.do_send(ScheduleManagerReq::UpdateJob(value.clone()));
}
Ok(value)
}
fn update_job(&mut self, job_param: JobParam) -> anyhow::Result<()> {
let id = job_param.id.unwrap_or_default();
if id == 0 {
return Err(anyhow::anyhow!("UpdateJob JobParam.id==0 is invalid!"));
}
if let Some(job_wrap) = self.job_map.get_mut(&id) {
let job_info = &job_wrap.job;
let old_job_key = if job_info.key.is_empty() {
None
} else {
Some(job_info.build_job_key())
};
let mut new_job = job_info.as_ref().clone();
new_job.update_param(job_param);
job_info.check_valid()?;
let value = Arc::new(new_job);
if let Some(old_key) = old_job_key {
self.job_key_map.remove(&old_key);
}
if !value.key.is_empty() {
let new_job_key = value.build_job_key();
self.job_key_map.insert(new_job_key, value.id);
}
job_wrap.job = value.clone();
if let Some(schedule_manager) = self.schedule_manager.as_ref() {
schedule_manager.do_send(ScheduleManagerReq::UpdateJob(value.clone()));
}
} else {
return Err(anyhow::anyhow!("UpdateJob,Nonexistent Job"));
}
Ok(())
}
fn remove_job(&mut self, id: u64) {
if let Some(job_wrap) = self.job_map.get(&id) {
if !job_wrap.job.key.is_empty() {
let job_key = job_wrap.job.build_job_key();
self.job_key_map.remove(&job_key);
}
}
self.job_map.remove(&id);
if let Some(schedule_manager) = self.schedule_manager.as_ref() {
schedule_manager.do_send(ScheduleManagerReq::RemoveJob(id));
}
}
fn do_update_job(&mut self, job: Arc<JobInfo>) {
if let Some(job_wrap) = self.job_map.get_mut(&job.id) {
if !job_wrap.job.key.is_empty() {
let old_job_key = job_wrap.job.build_job_key();
self.job_key_map.remove(&old_job_key);
}
job_wrap.job = job.clone();
if !job.key.is_empty() {
let new_job_key = job.build_job_key();
self.job_key_map.insert(new_job_key, job.id);
}
} else {
self.job_map.insert(job.id, JobWrap::new(job.clone()));
if !job.key.is_empty() {
let job_key = job.build_job_key();
self.job_key_map.insert(job_key, job.id);
}
if let Some(schedule_manager) = self.schedule_manager.as_ref() {
schedule_manager.do_send(ScheduleManagerReq::UpdateJob(job));
}
}
}
fn update_job_task(&mut self, task_log: Arc<JobTaskInfo>) {
let mut delay_finish = DelayFinishTasks::new();
if let Some(job_wrap) = self.job_map.get_mut(&task_log.job_id) {
if let Some(success) =
job_wrap.update_task_log(task_log.clone(), self.job_task_log_limit)
{
delay_finish.add_task(task_log.task_id, success);
}
}
if let Some(schedule_manager) = self.schedule_manager.as_ref() {
if delay_finish.is_empty() {
schedule_manager.do_send(ScheduleManagerReq::UpdateTask(task_log));
} else {
schedule_manager.do_send(ScheduleManagerReq::DelayFinishTasks(delay_finish));
}
}
}
fn update_job_task_list(&mut self, task_logs: Vec<Arc<JobTaskInfo>>) {
let mut delay_finish = DelayFinishTasks::new();
let mut send_list = vec![];
for task_log in task_logs {
if let Some(job_wrap) = self.job_map.get_mut(&task_log.job_id) {
if let Some(success) =
job_wrap.update_task_log(task_log.clone(), self.job_task_log_limit)
{
delay_finish.add_task(task_log.task_id, success);
continue;
}
}
send_list.push(task_log);
}
if let Some(schedule_manager) = self.schedule_manager.as_ref() {
if !send_list.is_empty() {
schedule_manager.do_send(ScheduleManagerReq::UpdateTaskList(send_list));
}
if !delay_finish.is_empty() {
schedule_manager.do_send(ScheduleManagerReq::DelayFinishTasks(delay_finish));
}
}
}
fn query_jobs(&self, query_param: &JobQueryParam) -> (usize, Vec<JobInfoDto>) {
let mut rlist = Vec::new();
let end_index = query_param.offset + query_param.limit;
let mut index = 0;
for (_, job_wrap) in self.job_map.iter().rev() {
let job_info = &job_wrap.job;
if query_param.match_namespace(&job_info.namespace)
&& query_param.match_app_name(&job_info.app_name)
&& query_param.match_description(&job_info.description)
&& query_param.match_handle_name(&job_info.handle_name)
&& query_param.match_key(&job_info.key)
{
if index >= query_param.offset && index < end_index {
rlist.push(JobInfoDto::new_from(job_info));
}
index += 1;
}
}
(index, rlist)
}
fn query_job_task_logs(
&self,
query_param: &JobTaskLogQueryParam,
) -> (usize, Vec<Arc<JobTaskInfo>>) {
let mut rlist = Vec::new();
let end_index = query_param.offset + query_param.limit;
let mut index = 0;
if let Some(job_wrap) = self.job_map.get(&query_param.job_id) {
for (_task_id, task_log) in job_wrap.task_log_map.iter().rev() {
if index >= query_param.offset && index < end_index {
rlist.push(task_log.clone());
}
index += 1;
}
}
(index, rlist)
}
fn build_snapshot(&self, writer: Addr<SnapshotWriterActor>) -> anyhow::Result<()> {
for (key, job_wrap) in &self.job_map {
let mut buf = Vec::new();
{
let mut writer = Writer::new(&mut buf);
let value_do = job_wrap.job.as_ref().to_do();
writer.write_message(&value_do)?;
}
let record = SnapshotRecordDto {
tree: JOB_TABLE_NAME.clone(),
key: id_to_bin(*key),
value: buf,
op_type: 0,
};
writer.do_send(SnapshotWriterRequest::Record(record));
}
for (_, job_wrap) in &self.job_map {
for (task_id, task_log) in job_wrap.task_log_map.iter() {
let mut buf = Vec::new();
{
let mut writer = Writer::new(&mut buf);
let value_do = task_log.as_ref().to_do();
writer.write_message(&value_do)?;
}
let record = SnapshotRecordDto {
tree: JOB_TASK_TABLE_NAME.clone(),
key: id_to_bin(*task_id),
value: buf,
op_type: 0,
};
writer.do_send(SnapshotWriterRequest::Record(record));
}
}
Ok(())
}
fn load_snapshot_record(&mut self, record: SnapshotRecordDto) -> anyhow::Result<()> {
if record.tree.as_str() == JOB_TABLE_NAME.as_str() {
let mut reader = BytesReader::from_bytes(&record.value);
let value_do: JobDo = reader.read_message(&record.value)?;
let value = Arc::new(value_do.into());
self.do_update_job(value);
} else if record.tree.as_str() == JOB_TASK_TABLE_NAME.as_str() {
let mut reader = BytesReader::from_bytes(&record.value);
let value_do: JobTaskDo = reader.read_message(&record.value)?;
let value: Arc<JobTaskInfo> = Arc::new(value_do.into());
if let Some(job_wrap) = self.job_map.get_mut(&value.job_id) {
job_wrap.update_task_log(value, self.job_task_log_limit);
}
}
Ok(())
}
fn load_completed(&mut self, _ctx: &mut Context<Self>) -> anyhow::Result<()> {
Ok(())
}
pub(crate) fn get_all_enable_size(&self) -> usize {
self.job_map
.values()
.filter(|job_wrap| job_wrap.job.enable)
.count()
}
}
impl Actor for JobManager {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
log::info!("JobManager started");
}
}
impl Inject for JobManager {
type Context = Context<Self>;
fn inject(
&mut self,
factory_data: FactoryData,
_factory: BeanFactory,
_ctx: &mut Self::Context,
) {
self.schedule_manager = factory_data.get_actor();
}
}
impl Handler<JobManagerReq> for JobManager {
type Result = anyhow::Result<JobManagerResult>;
fn handle(&mut self, msg: JobManagerReq, _ctx: &mut Self::Context) -> Self::Result {
match msg {
JobManagerReq::GetJob(id) => {
let job_info = if let Some(job_wrap) = self.job_map.get(&id) {
Some(job_wrap.job.clone())
} else {
None
};
return Ok(JobManagerResult::JobInfo(job_info));
}
JobManagerReq::GetJobIdByKey(job_key) => {
if job_key.job_key.is_empty() {
return Ok(JobManagerResult::JobId(None));
}
let job_id = self.job_key_map.get(&job_key).copied();
return Ok(JobManagerResult::JobId(job_id));
}
JobManagerReq::UpdateTask(task_info) => {
self.update_job_task(task_info);
}
JobManagerReq::QueryJob(query_param) => {
let (size, list) = self.query_jobs(&query_param);
return Ok(JobManagerResult::JobPageInfo(size, list));
}
JobManagerReq::QueryJobTaskLog(query_param) => {
let (size, list) = self.query_job_task_logs(&query_param);
return Ok(JobManagerResult::JobTaskLogPageInfo(size, list));
}
JobManagerReq::CountJobsByNamespace(namespace) => {
let count = self
.job_map
.values()
.filter(|job| job.job.namespace.as_str() == namespace)
.count();
return Ok(JobManagerResult::Count(count));
}
}
Ok(JobManagerResult::None)
}
}
impl Handler<JobManagerRaftReq> for JobManager {
type Result = anyhow::Result<JobManagerRaftResult>;
fn handle(&mut self, msg: JobManagerRaftReq, _ctx: &mut Self::Context) -> Self::Result {
match msg {
JobManagerRaftReq::AddJob(job_param) => {
let value = self.create_job(job_param)?;
return Ok(JobManagerRaftResult::JobInfo(value));
}
JobManagerRaftReq::UpdateJob(job_param) => {
self.update_job(job_param)?;
}
JobManagerRaftReq::Remove(id) => {
self.remove_job(id);
}
JobManagerRaftReq::UpdateTask(task_info) => {
self.update_job_task(task_info);
}
JobManagerRaftReq::UpdateTaskList(tasks) => {
self.update_job_task_list(tasks);
}
}
Ok(JobManagerRaftResult::None)
}
}
impl Handler<RaftApplyDataRequest> for JobManager {
type Result = anyhow::Result<RaftApplyDataResponse>;
fn handle(&mut self, msg: RaftApplyDataRequest, ctx: &mut Self::Context) -> Self::Result {
match msg {
RaftApplyDataRequest::BuildSnapshot(writer) => {
self.build_snapshot(writer)?;
}
RaftApplyDataRequest::LoadSnapshotRecord(record) => {
self.load_snapshot_record(record)?;
}
RaftApplyDataRequest::LoadCompleted => {
self.load_completed(ctx)?;
}
}
Ok(RaftApplyDataResponse::None)
}
}