use crate::common::constant::{EMPTY_ARC_STR, SEQ_JOB_ID};
use crate::common::datetime_utils::{now_millis, now_second_u32};
use crate::common::model::{ApiResult, PageResult, UserSession};
use crate::common::share_data::ShareData;
use crate::console::model::job::{
JobInfoParam, JobQueryListRequest, JobTaskLogQueryListRequest, TriggerJobParam,
};
use crate::console::v1::{
ERROR_CODE_JOB_KEY_DUPLICATE, ERROR_CODE_NO_APP_PERMISSION, ERROR_CODE_SYSTEM_ERROR,
};
use crate::job::model::actor_model::{
JobManagerRaftReq, JobManagerRaftResult, JobManagerReq, JobManagerResult,
};
use crate::job::model::job::{JobKey, JobParam};
use crate::raft::store::{ClientRequest, ClientResponse};
use crate::schedule::model::actor_model::{ScheduleManagerReq, ScheduleManagerResult};
use crate::sequence::{SequenceRequest, SequenceResult};
use crate::task::model::actor_model::{TaskManagerReq, TriggerItem};
use actix_http::HttpMessage;
use actix_web::web::Data;
use actix_web::{web, HttpResponse, Responder};
use std::sync::Arc;
pub(crate) async fn query_job_list(
req: actix_web::HttpRequest,
share_data: Data<Arc<ShareData>>,
web::Query(request): web::Query<JobQueryListRequest>,
) -> impl Responder {
let session = if let Some(session) = req.extensions().get::<Arc<UserSession>>() {
session.clone()
} else {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("user session is invalid".to_string()),
));
};
let param = request.to_param_with_session(&session);
if let Ok(Ok(JobManagerResult::JobPageInfo(total_count, list))) = share_data
.job_manager
.send(JobManagerReq::QueryJob(param))
.await
{
HttpResponse::Ok().json(ApiResult::success(Some(PageResult { total_count, list })))
} else {
HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("query_job_list error".to_string()),
))
}
}
pub(crate) async fn query_job_info(
req: actix_web::HttpRequest,
share_data: Data<Arc<ShareData>>,
web::Query(param): web::Query<JobInfoParam>,
) -> impl Responder {
let id = param.id.unwrap_or_default();
if id == 0 {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("query_job_info error,the job id is invalid".to_string()),
));
}
let app_privilege = if let Some(session) = req.extensions().get::<Arc<UserSession>>() {
session.app_privilege.clone()
} else {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("user session is invalid".to_string()),
));
};
if let Ok(Ok(JobManagerResult::JobInfo(Some(info)))) =
share_data.job_manager.send(JobManagerReq::GetJob(id)).await
{
if !app_privilege.check_permission(&info.app_name) {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_NO_APP_PERMISSION.to_string(),
Some(format!("user no app permission:{}", &info.app_name)),
));
}
HttpResponse::Ok().json(ApiResult::success(Some(info)))
} else {
HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("query_job_info error".to_string()),
))
}
}
async fn do_create_job(
share_data: Data<Arc<ShareData>>,
mut param: JobParam,
) -> anyhow::Result<HttpResponse> {
param.check_valid()?;
if param.key.is_none() || param.key.as_ref().unwrap().is_empty() {
param.key = Some(Arc::new(uuid::Uuid::new_v4().to_string().replace('-', "")));
}
if let Some(ref key) = param.key {
if !key.is_empty() {
let job_key = JobKey::new(
¶m.namespace.as_ref().unwrap(),
¶m.app_name.as_ref().unwrap(),
key,
);
if let Ok(Ok(JobManagerResult::JobId(Some(_)))) = share_data
.job_manager
.send(JobManagerReq::GetJobIdByKey(job_key))
.await
{
return Err(anyhow::anyhow!(
"job key already exists: namespace={}, app_name={}, key={}",
param.namespace.as_ref().unwrap(),
param.app_name.as_ref().unwrap(),
key
));
}
}
}
if let SequenceResult::NextId(id) = share_data
.sequence_manager
.send(SequenceRequest::GetNextId(SEQ_JOB_ID.clone()))
.await??
{
param.id = Some(id);
param.update_time = Some(now_millis());
if let ClientResponse::JobResp {
resp: JobManagerRaftResult::JobInfo(job),
} = share_data
.raft_request_route
.request(ClientRequest::JobReq {
req: JobManagerRaftReq::AddJob(param),
})
.await?
{
Ok(HttpResponse::Ok().json(ApiResult::success(Some(job))))
} else {
Err(anyhow::anyhow!("create job result type error!"))
}
} else {
Err(anyhow::anyhow!("get job id error!"))
}
}
pub(crate) async fn create_job(
req: actix_web::HttpRequest,
share_data: Data<Arc<ShareData>>,
web::Json(param): web::Json<JobInfoParam>,
) -> impl Responder {
let param = param.to_param();
let app_privilege = if let Some(session) = req.extensions().get::<Arc<UserSession>>() {
session.app_privilege.clone()
} else {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("user session is invalid".to_string()),
));
};
if let Some(app_name) = param.app_name.as_ref() {
if !app_privilege.check_permission(app_name) {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_NO_APP_PERMISSION.to_string(),
Some(format!("user no app permission:{}", app_name)),
));
}
}
match do_create_job(share_data, param).await {
Ok(v) => v,
Err(e) => {
let error_msg = format!("create_job error,{}", e);
let error_code = if error_msg.find("job key already exists").is_some() {
ERROR_CODE_JOB_KEY_DUPLICATE.to_string()
} else {
ERROR_CODE_SYSTEM_ERROR.to_string()
};
HttpResponse::Ok().json(ApiResult::<()>::error(error_code, Some(error_msg)))
}
}
}
pub(crate) async fn update_job(
req: actix_web::HttpRequest,
share_data: Data<Arc<ShareData>>,
web::Json(param): web::Json<JobInfoParam>,
) -> impl Responder {
let param = param.to_param();
let id = param.id.clone().unwrap_or_default();
if id == 0 {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("update_job error,the job id is invalid".to_string()),
));
}
let app_privilege = if let Some(session) = req.extensions().get::<Arc<UserSession>>() {
session.app_privilege.clone()
} else {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("user session is invalid".to_string()),
));
};
if let Some(app_name) = param.app_name.as_ref() {
if !app_privilege.check_permission(app_name) {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_NO_APP_PERMISSION.to_string(),
Some(format!("user no app permission:{}", app_name)),
));
}
}
let original_job_info = if let Ok(Ok(JobManagerResult::JobInfo(Some(info)))) =
share_data.job_manager.send(JobManagerReq::GetJob(id)).await
{
if !app_privilege.check_permission(&info.app_name) {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_NO_APP_PERMISSION.to_string(),
Some(format!("user no app permission:{}", &info.app_name)),
));
}
Some(info)
} else {
None
};
if let Some(ref key) = param.key {
if !key.is_empty() {
let (namespace, app_name) = match (¶m.namespace, ¶m.app_name) {
(Some(ns), Some(app)) => (ns.clone(), app.clone()),
_ => {
if let Some(ref info) = original_job_info {
(info.namespace.clone(), info.app_name.clone())
} else {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some(
"update_job error,namespace/app_name is required when key is set"
.to_string(),
),
));
}
}
};
let job_key = JobKey::new(&namespace, &app_name, key);
if let Ok(Ok(JobManagerResult::JobId(Some(existing_id)))) = share_data
.job_manager
.send(JobManagerReq::GetJobIdByKey(job_key))
.await
{
if existing_id != id {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_JOB_KEY_DUPLICATE.to_string(),
Some(format!(
"job key already exists: namespace={}, app_name={}, key={}",
namespace, app_name, key
)),
));
}
}
}
}
if let Err(e) = param.check_valid() {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some(format!("update_job error,{}", e)),
));
}
if let Ok(_) = share_data
.raft_request_route
.request(ClientRequest::JobReq {
req: JobManagerRaftReq::UpdateJob(param),
})
.await
{
HttpResponse::Ok().json(ApiResult::success(Some(())))
} else {
HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("update_job error".to_string()),
))
}
}
pub(crate) async fn remove_job(
req: actix_web::HttpRequest,
share_data: Data<Arc<ShareData>>,
web::Json(param): web::Json<JobInfoParam>,
) -> impl Responder {
let id = param.id.unwrap_or_default();
if id == 0 {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("remove_job error,the job id is invalid".to_string()),
));
}
let app_privilege = if let Some(session) = req.extensions().get::<Arc<UserSession>>() {
session.app_privilege.clone()
} else {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("user session is invalid".to_string()),
));
};
if let Ok(Ok(JobManagerResult::JobInfo(Some(info)))) =
share_data.job_manager.send(JobManagerReq::GetJob(id)).await
{
if !app_privilege.check_permission(&info.app_name) {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_NO_APP_PERMISSION.to_string(),
Some(format!("user no app permission:{}", &info.app_name)),
));
}
}
if let Ok(_) = share_data
.raft_request_route
.request(ClientRequest::JobReq {
req: JobManagerRaftReq::Remove(id),
})
.await
{
HttpResponse::Ok().json(ApiResult::success(Some(())))
} else {
HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("remove_job error".to_string()),
))
}
}
pub(crate) async fn trigger_job(
req: actix_web::HttpRequest,
share_data: Data<Arc<ShareData>>,
web::Json(param): web::Json<TriggerJobParam>,
) -> impl Responder {
let id = param.job_id.unwrap_or_default();
if id == 0 {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("trigger_job error,the job id is invalid".to_string()),
));
}
let session = if let Some(session) = req.extensions().get::<Arc<UserSession>>() {
session.clone()
} else {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("user session is invalid".to_string()),
));
};
let app_privilege = &session.app_privilege;
let job_info = if let Ok(Ok(JobManagerResult::JobInfo(Some(job_info)))) =
share_data.job_manager.send(JobManagerReq::GetJob(id)).await
{
if !app_privilege.check_permission(&job_info.app_name) {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_NO_APP_PERMISSION.to_string(),
Some(format!("user no app permission:{}", &job_info.app_name)),
));
}
job_info
} else {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("query_job_info error".to_string()),
));
};
let task_item = TriggerItem::new_with_user(
now_second_u32(),
job_info,
param.instance_addr.unwrap_or(EMPTY_ARC_STR.clone()),
session.username.clone(),
);
log::info!("trigger_job task_item:{:?}", &task_item);
if let Ok(Ok(_)) = share_data
.task_manager
.send(TaskManagerReq::TriggerTaskList(vec![task_item]))
.await
{
HttpResponse::Ok().json(ApiResult::success(Some(())))
} else {
HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("trigger_job error".to_string()),
))
}
}
pub(crate) async fn query_job_task_logs(
req: actix_web::HttpRequest,
share_data: Data<Arc<ShareData>>,
web::Query(request): web::Query<JobTaskLogQueryListRequest>,
) -> impl Responder {
let param = request.to_param();
let app_privilege = if let Some(session) = req.extensions().get::<Arc<UserSession>>() {
session.app_privilege.clone()
} else {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("user session is invalid".to_string()),
));
};
if let Ok(Ok(JobManagerResult::JobInfo(Some(info)))) = share_data
.job_manager
.send(JobManagerReq::GetJob(param.job_id))
.await
{
if !app_privilege.check_permission(&info.app_name) {
return HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_NO_APP_PERMISSION.to_string(),
Some(format!("user no app permission:{}", &info.app_name)),
));
}
}
if let Ok(Ok(JobManagerResult::JobTaskLogPageInfo(total_count, list))) = share_data
.job_manager
.send(JobManagerReq::QueryJobTaskLog(param))
.await
{
HttpResponse::Ok().json(ApiResult::success(Some(PageResult { total_count, list })))
} else {
HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("query_job_task_logs error".to_string()),
))
}
}
pub(crate) async fn query_latest_task(
share_data: Data<Arc<ShareData>>,
web::Query(request): web::Query<JobTaskLogQueryListRequest>,
) -> impl Responder {
let param = request.to_param();
if let Ok(Ok(ScheduleManagerResult::JobTaskLogPageInfo(total_count, list))) = share_data
.schedule_manager
.send(ScheduleManagerReq::QueryJobTaskLog(param))
.await
{
HttpResponse::Ok().json(ApiResult::success(Some(PageResult { total_count, list })))
} else {
HttpResponse::Ok().json(ApiResult::<()>::error(
ERROR_CODE_SYSTEM_ERROR.to_string(),
Some("query_latest_task error".to_string()),
))
}
}