use axum::{
extract::{Path, State},
http::StatusCode,
Json,
};
use croner::Cron;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use uuid::Uuid;
use crate::error::AppError;
use crate::paths::job_toml_path;
use crate::storage::{remove_job_dir, write_job};
use crate::utils::time::now_secs;
#[derive(
Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema, utoipa::ToSchema,
)]
#[serde(rename_all = "snake_case")]
pub enum CronJobSourceType {
Managed,
System,
}
impl CronJobSourceType {
pub fn from_source(source: &str) -> Self {
if source == "managed" {
Self::Managed
} else {
Self::System
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, utoipa::ToSchema)]
pub struct CronJob {
pub id: String,
pub schedule: String,
pub handler: String,
pub metadata: serde_json::Value,
pub enabled: bool,
pub source: String,
pub created_at: u64,
pub updated_at: u64,
pub last_triggered_at: Option<u64>,
}
#[derive(Debug, Clone, Serialize, JsonSchema, utoipa::ToSchema)]
pub struct CronJobResponse {
#[serde(flatten)]
pub job: CronJob,
pub source_type: CronJobSourceType,
pub handler_registered: bool,
pub file_path: String,
pub schedule_description: Option<String>,
}
impl CronJobResponse {
pub fn from_job(job: CronJob, handlers: &HashSet<String>) -> Self {
let source_type = CronJobSourceType::from_source(&job.source);
let handler_registered = handlers.contains(&job.handler);
let file_path = job_toml_path(&job.id).to_string_lossy().into_owned();
let schedule_description = job.schedule.parse::<Cron>().ok().map(|c| c.describe());
Self {
job,
source_type,
handler_registered,
file_path,
schedule_description,
}
}
}
pub type CronStore = Arc<Mutex<HashMap<String, CronJob>>>;
pub type HandlerRegistry = Arc<HashSet<String>>;
#[derive(Clone)]
pub struct AppState {
pub store: CronStore,
pub handlers: HandlerRegistry,
pub routines: crate::routines::RoutineStore,
pub uptime_start: u64,
}
impl axum::extract::FromRef<AppState> for CronStore {
fn from_ref(state: &AppState) -> Self {
state.store.clone()
}
}
impl axum::extract::FromRef<AppState> for crate::routines::RoutineStore {
fn from_ref(state: &AppState) -> Self {
state.routines.clone()
}
}
#[cfg(test)]
pub fn new_store() -> CronStore {
Arc::new(Mutex::new(HashMap::new()))
}
pub fn new_registry() -> HandlerRegistry {
Arc::new(HashSet::new())
}
pub(crate) fn normalize_schedule(expr: &str) -> String {
let s = expr.trim();
if s.starts_with('@') {
return s.to_string();
}
let fields: Vec<&str> = s.split_ascii_whitespace().collect();
match fields.len() {
7 => fields[1..6].join(" "),
_ => s.to_string(),
}
}
pub(crate) fn validate_cron(expr: &str) -> Result<(), AppError> {
let normalized = normalize_schedule(expr.trim());
normalized
.parse::<Cron>()
.map_err(|e| AppError::BadRequest(format!("invalid cron expression: {}", e)))?;
Ok(())
}
#[derive(Deserialize, JsonSchema, utoipa::ToSchema)]
pub struct CreateRequest {
pub schedule: String,
pub handler: String,
#[serde(default)]
#[schemars(schema_with = "crate::utils::schema::metadata_schema")]
pub metadata: serde_json::Value,
#[serde(default = "bool_true")]
pub enabled: bool,
}
fn bool_true() -> bool {
true
}
#[derive(Deserialize, JsonSchema, utoipa::ToSchema)]
pub struct UpdateRequest {
pub schedule: Option<String>,
pub handler: Option<String>,
#[schemars(schema_with = "crate::utils::schema::metadata_schema")]
pub metadata: Option<serde_json::Value>,
pub enabled: Option<bool>,
}
pub fn svc_list(store: &CronStore, handlers: &HandlerRegistry) -> Vec<CronJobResponse> {
let lock = store.lock().unwrap();
let mut jobs: Vec<CronJob> = lock.values().cloned().collect();
jobs.sort_by_key(|j| j.created_at);
drop(lock);
jobs.into_iter()
.map(|j| CronJobResponse::from_job(j, handlers))
.collect()
}
pub fn svc_get(
store: &CronStore,
handlers: &HandlerRegistry,
id: &str,
) -> Result<CronJobResponse, AppError> {
let job = store
.lock()
.unwrap()
.get(id)
.cloned()
.ok_or(AppError::NotFound)?;
Ok(CronJobResponse::from_job(job, handlers))
}
pub fn svc_create(
store: &CronStore,
handlers: &HandlerRegistry,
req: CreateRequest,
) -> Result<CronJobResponse, AppError> {
validate_cron(&req.schedule)?;
let now = now_secs();
let job = CronJob {
id: Uuid::new_v4().to_string(),
schedule: normalize_schedule(&req.schedule),
handler: req.handler,
metadata: req.metadata,
enabled: req.enabled,
source: "managed".to_string(),
created_at: now,
updated_at: now,
last_triggered_at: None,
};
write_job(&job).map_err(|_| AppError::Internal)?;
store.lock().unwrap().insert(job.id.clone(), job.clone());
if let Err(e) = crate::sync::sync_to_crontab(store) {
log::warn!("crontab sync after create failed: {e}");
}
Ok(CronJobResponse::from_job(job, handlers))
}
pub fn svc_update(
store: &CronStore,
handlers: &HandlerRegistry,
id: &str,
req: UpdateRequest,
) -> Result<CronJobResponse, AppError> {
if let Some(ref sched) = req.schedule {
validate_cron(sched)?;
}
let mut lock = store.lock().unwrap();
let job = lock.get_mut(id).ok_or(AppError::NotFound)?;
if let Some(s) = req.schedule {
job.schedule = normalize_schedule(&s);
}
if let Some(h) = req.handler {
job.handler = h;
}
if let Some(m) = req.metadata {
job.metadata = m;
}
if let Some(e) = req.enabled {
job.enabled = e;
}
job.updated_at = now_secs();
let job = job.clone();
drop(lock);
write_job(&job).map_err(|_| AppError::Internal)?;
if let Err(e) = crate::sync::sync_to_crontab(store) {
log::warn!("crontab sync after update failed: {e}");
}
Ok(CronJobResponse::from_job(job, handlers))
}
pub fn svc_delete(
store: &CronStore,
handlers: &HandlerRegistry,
id: &str,
) -> Result<CronJobResponse, AppError> {
let job = store.lock().unwrap().remove(id).ok_or(AppError::NotFound)?;
remove_job_dir(id).map_err(|_| AppError::Internal)?;
if let Err(e) = crate::sync::sync_to_crontab(store) {
log::warn!("crontab sync after delete failed: {e}");
}
Ok(CronJobResponse::from_job(job, handlers))
}
pub fn svc_trigger(store: &CronStore, id: &str) -> Result<CronJob, AppError> {
let mut lock = store.lock().unwrap();
let job = lock.get_mut(id).ok_or(AppError::NotFound)?;
job.last_triggered_at = Some(now_secs());
let job = job.clone();
drop(lock);
write_job(&job).map_err(|_| AppError::Internal)?;
let handler_path = crate::paths::handlers_dir().join(&job.handler);
if handler_path.exists() {
if let Err(e) = std::process::Command::new(&handler_path).spawn() {
log::warn!("trigger: failed to spawn handler {:?}: {e}", handler_path);
}
} else {
log::warn!("trigger: handler script not found at {:?}", handler_path);
}
Ok(job)
}
#[utoipa::path(post, path = "/cron-jobs",
request_body = CreateRequest,
responses((status = 201, body = CronJobResponse), (status = 400, description = "Invalid cron expression")))]
pub async fn create(
State(state): State<AppState>,
Json(body): Json<CreateRequest>,
) -> Result<(StatusCode, Json<CronJobResponse>), AppError> {
Ok((
StatusCode::CREATED,
Json(svc_create(&state.store, &state.handlers, body)?),
))
}
#[utoipa::path(get, path = "/cron-jobs",
responses((status = 200, body = Vec<CronJobResponse>)))]
pub async fn list(State(state): State<AppState>) -> Json<Vec<CronJobResponse>> {
Json(svc_list(&state.store, &state.handlers))
}
#[utoipa::path(get, path = "/cron-jobs/{id}",
params(("id" = String, Path, description = "Cron job UUID")),
responses((status = 200, body = CronJobResponse), (status = 404, description = "Not found")))]
pub async fn get(
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<Json<CronJobResponse>, AppError> {
Ok(Json(svc_get(&state.store, &state.handlers, &id)?))
}
#[utoipa::path(patch, path = "/cron-jobs/{id}",
params(("id" = String, Path, description = "Cron job UUID")),
request_body = UpdateRequest,
responses((status = 200, body = CronJobResponse), (status = 400, description = "Invalid"), (status = 404, description = "Not found")))]
pub async fn update(
State(state): State<AppState>,
Path(id): Path<String>,
Json(body): Json<UpdateRequest>,
) -> Result<Json<CronJobResponse>, AppError> {
Ok(Json(svc_update(&state.store, &state.handlers, &id, body)?))
}
#[utoipa::path(put, path = "/cron-jobs/{id}",
params(("id" = String, Path, description = "Cron job UUID")),
request_body = UpdateRequest,
responses((status = 200, body = CronJobResponse), (status = 400, description = "Invalid"), (status = 404, description = "Not found")))]
pub async fn replace(
state: State<AppState>,
path: Path<String>,
body: Json<UpdateRequest>,
) -> Result<Json<CronJobResponse>, AppError> {
update(state, path, body).await
}
#[utoipa::path(delete, path = "/cron-jobs/{id}",
params(("id" = String, Path, description = "Cron job UUID")),
responses((status = 200, body = CronJobResponse), (status = 404, description = "Not found")))]
pub async fn delete(
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<Json<CronJobResponse>, AppError> {
Ok(Json(svc_delete(&state.store, &state.handlers, &id)?))
}
#[utoipa::path(post, path = "/cron-jobs/{id}/trigger",
params(("id" = String, Path, description = "Cron job UUID")),
responses((status = 200, body = CronJob), (status = 404, description = "Not found")))]
pub async fn trigger(
State(store): State<CronStore>,
Path(id): Path<String>,
) -> Result<Json<CronJob>, AppError> {
Ok(Json(svc_trigger(&store, &id)?))
}
pub fn svc_logs_path(store: &CronStore, id: &str) -> Result<std::path::PathBuf, AppError> {
if !store.lock().unwrap().contains_key(id) {
return Err(AppError::NotFound);
}
Ok(crate::paths::job_log_path(id))
}
#[utoipa::path(get, path = "/cron-jobs/{id}/logs",
params(("id" = String, Path, description = "Cron job UUID")),
responses((status = 200, description = "Log file contents as plain text"), (status = 404, description = "Not found")))]
pub async fn get_logs(
State(store): State<CronStore>,
Path(id): Path<String>,
) -> Result<String, AppError> {
let log_path = svc_logs_path(&store, &id)?;
if !log_path.exists() {
return Ok(String::new());
}
tokio::fs::read_to_string(&log_path)
.await
.map_err(|_| AppError::Internal)
}
#[cfg(test)]
#[path = "cron_jobs_tests.rs"]
mod cron_jobs_tests;