use std::sync::Arc;
use axum::extract::{Path, Query, State};
use axum::routing::{get, post};
use axum::{Json, Router};
use serde::Deserialize;
use utoipa::ToSchema;
use crate::api::workflows::AppError;
use crate::api::AppState;
use crate::store::WorkflowStore;
use crate::types::{SchedulePatch, WorkflowSchedule};
pub fn router<S: WorkflowStore + 'static>() -> Router<Arc<AppState<S>>> {
Router::new()
.route("/schedules", post(create_schedule).get(list_schedules))
.route(
"/schedules/{name}",
get(get_schedule)
.patch(patch_schedule)
.delete(delete_schedule),
)
.route("/schedules/{name}/pause", post(pause_schedule))
.route("/schedules/{name}/resume", post(resume_schedule))
}
#[derive(Deserialize, ToSchema)]
pub struct CreateScheduleRequest {
pub name: String,
#[serde(default = "default_namespace")]
pub namespace: String,
pub workflow_type: String,
pub cron_expr: String,
#[serde(default = "default_timezone")]
pub timezone: String,
pub input: Option<serde_json::Value>,
#[serde(default = "default_queue")]
pub task_queue: String,
#[serde(default = "default_overlap")]
pub overlap_policy: String,
}
fn default_queue() -> String {
"main".to_string()
}
fn default_namespace() -> String {
"main".to_string()
}
fn default_overlap() -> String {
"skip".to_string()
}
fn default_timezone() -> String {
"UTC".to_string()
}
#[utoipa::path(
post, path = "/api/v1/schedules",
tag = "schedules",
request_body = CreateScheduleRequest,
responses(
(status = 201, description = "Schedule created", body = WorkflowSchedule),
(status = 500, description = "Internal error"),
),
)]
pub async fn create_schedule<S: WorkflowStore>(
State(state): State<Arc<AppState<S>>>,
Json(req): Json<CreateScheduleRequest>,
) -> Result<(axum::http::StatusCode, Json<serde_json::Value>), AppError> {
let now = timestamp_now();
if !req.timezone.eq_ignore_ascii_case("UTC")
&& req.timezone.parse::<chrono_tz::Tz>().is_err()
{
return Err(AppError::Internal(anyhow::anyhow!(
"invalid timezone: {}",
req.timezone
)));
}
let schedule = WorkflowSchedule {
name: req.name.clone(),
namespace: req.namespace.clone(),
workflow_type: req.workflow_type,
cron_expr: req.cron_expr,
timezone: req.timezone,
input: req.input.map(|v| v.to_string()),
task_queue: req.task_queue,
overlap_policy: req.overlap_policy,
paused: false,
last_run_at: None,
next_run_at: None,
last_workflow_id: None,
created_at: now,
};
state.engine.create_schedule(&schedule).await?;
Ok((
axum::http::StatusCode::CREATED,
Json(serde_json::to_value(schedule)?),
))
}
#[derive(Deserialize)]
pub struct NsQuery {
#[serde(default = "default_namespace")]
namespace: String,
}
#[utoipa::path(
get, path = "/api/v1/schedules",
tag = "schedules",
params(("namespace" = Option<String>, Query, description = "Namespace (default: main)")),
responses((status = 200, description = "List of schedules", body = Vec<WorkflowSchedule>)),
)]
pub async fn list_schedules<S: WorkflowStore>(
State(state): State<Arc<AppState<S>>>,
Query(q): Query<NsQuery>,
) -> Result<Json<Vec<serde_json::Value>>, AppError> {
let schedules = state.engine.list_schedules(&q.namespace).await?;
let json: Vec<serde_json::Value> = schedules
.into_iter()
.map(|s| serde_json::to_value(s).unwrap_or_default())
.collect();
Ok(Json(json))
}
#[utoipa::path(
get, path = "/api/v1/schedules/{name}",
tag = "schedules",
params(("name" = String, Path, description = "Schedule name")),
responses(
(status = 200, description = "Schedule details", body = WorkflowSchedule),
(status = 404, description = "Schedule not found"),
),
)]
pub async fn get_schedule<S: WorkflowStore>(
State(state): State<Arc<AppState<S>>>,
Path(name): Path<String>,
Query(q): Query<NsQuery>,
) -> Result<Json<serde_json::Value>, AppError> {
let schedule = state
.engine
.get_schedule(&q.namespace, &name)
.await?
.ok_or(AppError::NotFound(format!("schedule {name}")))?;
Ok(Json(serde_json::to_value(schedule)?))
}
#[utoipa::path(
delete, path = "/api/v1/schedules/{name}",
tag = "schedules",
params(("name" = String, Path, description = "Schedule name")),
responses(
(status = 200, description = "Schedule deleted"),
(status = 404, description = "Schedule not found"),
),
)]
pub async fn delete_schedule<S: WorkflowStore>(
State(state): State<Arc<AppState<S>>>,
Path(name): Path<String>,
Query(q): Query<NsQuery>,
) -> Result<axum::http::StatusCode, AppError> {
let deleted = state.engine.delete_schedule(&q.namespace, &name).await?;
if deleted {
Ok(axum::http::StatusCode::OK)
} else {
Err(AppError::NotFound(format!("schedule {name}")))
}
}
#[derive(Deserialize, ToSchema)]
pub struct PatchScheduleRequest {
pub cron_expr: Option<String>,
pub timezone: Option<String>,
pub input: Option<serde_json::Value>,
pub task_queue: Option<String>,
pub overlap_policy: Option<String>,
}
#[utoipa::path(
patch, path = "/api/v1/schedules/{name}",
tag = "schedules",
params(
("name" = String, Path, description = "Schedule name"),
("namespace" = Option<String>, Query, description = "Namespace (default: main)"),
),
request_body = PatchScheduleRequest,
responses(
(status = 200, description = "Schedule updated", body = WorkflowSchedule),
(status = 404, description = "Schedule not found"),
),
)]
pub async fn patch_schedule<S: WorkflowStore>(
State(state): State<Arc<AppState<S>>>,
Path(name): Path<String>,
Query(q): Query<NsQuery>,
Json(req): Json<PatchScheduleRequest>,
) -> Result<Json<serde_json::Value>, AppError> {
if let Some(ref tz) = req.timezone
&& !tz.eq_ignore_ascii_case("UTC")
&& tz.parse::<chrono_tz::Tz>().is_err()
{
return Err(AppError::Internal(anyhow::anyhow!(
"invalid timezone: {tz}"
)));
}
let patch = SchedulePatch {
cron_expr: req.cron_expr,
timezone: req.timezone,
input: req.input,
task_queue: req.task_queue,
overlap_policy: req.overlap_policy,
};
let updated = state
.engine
.update_schedule(&q.namespace, &name, &patch)
.await?
.ok_or_else(|| AppError::NotFound(format!("schedule {name}")))?;
Ok(Json(serde_json::to_value(updated)?))
}
#[utoipa::path(
post, path = "/api/v1/schedules/{name}/pause",
tag = "schedules",
params(
("name" = String, Path, description = "Schedule name"),
("namespace" = Option<String>, Query, description = "Namespace (default: main)"),
),
responses(
(status = 200, description = "Schedule paused", body = WorkflowSchedule),
(status = 404, description = "Schedule not found"),
),
)]
pub async fn pause_schedule<S: WorkflowStore>(
State(state): State<Arc<AppState<S>>>,
Path(name): Path<String>,
Query(q): Query<NsQuery>,
) -> Result<Json<serde_json::Value>, AppError> {
let updated = state
.engine
.set_schedule_paused(&q.namespace, &name, true)
.await?
.ok_or_else(|| AppError::NotFound(format!("schedule {name}")))?;
Ok(Json(serde_json::to_value(updated)?))
}
#[utoipa::path(
post, path = "/api/v1/schedules/{name}/resume",
tag = "schedules",
params(
("name" = String, Path, description = "Schedule name"),
("namespace" = Option<String>, Query, description = "Namespace (default: main)"),
),
responses(
(status = 200, description = "Schedule resumed", body = WorkflowSchedule),
(status = 404, description = "Schedule not found"),
),
)]
pub async fn resume_schedule<S: WorkflowStore>(
State(state): State<Arc<AppState<S>>>,
Path(name): Path<String>,
Query(q): Query<NsQuery>,
) -> Result<Json<serde_json::Value>, AppError> {
let updated = state
.engine
.set_schedule_paused(&q.namespace, &name, false)
.await?
.ok_or_else(|| AppError::NotFound(format!("schedule {name}")))?;
Ok(Json(serde_json::to_value(updated)?))
}
fn timestamp_now() -> f64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs_f64()
}