use axum::{
Json, Router,
extract::{Path, Query, State},
http::StatusCode,
response::{IntoResponse, Response},
routing::{delete, get, post},
};
use std::sync::Arc;
use zart::{DurableScheduler, admin::PauseScope, admin::RerunSpec};
use crate::{
models::{
ErrorResponse, ExecutionDetailResponse, ExecutionResponse, PauseRequest, PauseRuleResponse,
RerunRequest, RerunResponse, RestartRequest, RestartResponse, RetryStepRequest,
RetryStepResponse, RunRecordResponse, StepAttemptResponse, StepDetailResponse,
},
state::AdminState,
};
pub fn admin_router(scheduler: Arc<DurableScheduler>) -> Router {
Router::new()
.route(
"/admin/v1/executions/{execution_id}/retry-step",
post(retry_step),
)
.route("/admin/v1/executions/{execution_id}/restart", post(restart))
.route("/admin/v1/executions/{execution_id}/rerun", post(rerun))
.route("/admin/v1/executions/{execution_id}/runs", get(list_runs))
.route(
"/admin/v1/executions/{execution_id}/detail",
get(execution_detail),
)
.route("/admin/v1/pause", post(create_pause))
.route("/admin/v1/pause", get(list_pauses))
.route("/admin/v1/pause/{rule_id}", post(resume_rule))
.route("/admin/v1/pause/{rule_id}", delete(delete_pause_rule))
.with_state(AdminState { scheduler })
}
async fn retry_step(
State(state): State<AdminState>,
Path(execution_id): Path<String>,
Json(req): Json<RetryStepRequest>,
) -> Response {
let run_id = match get_current_run_id(&state.scheduler, &execution_id).await {
Some(id) => id,
None => return not_found(&execution_id),
};
match state
.scheduler
.retry_step(&run_id, &req.step_name, req.triggered_by.as_deref())
.await
{
Ok(new_task_id) => {
(StatusCode::OK, Json(RetryStepResponse { new_task_id })).into_response()
}
Err(zart::error::SchedulerError::Database(_))
| Err(zart::error::SchedulerError::ExecutionNotFound(_)) => not_found(&execution_id),
Err(e) => scheduler_error_response(e),
}
}
async fn restart(
State(state): State<AdminState>,
Path(execution_id): Path<String>,
Json(req): Json<RestartRequest>,
) -> Response {
match state
.scheduler
.restart(&execution_id, req.payload, req.triggered_by.as_deref())
.await
{
Ok(new_run_id) => (StatusCode::OK, Json(RestartResponse { new_run_id })).into_response(),
Err(zart::error::SchedulerError::ExecutionNotFound(_)) => not_found(&execution_id),
Err(e) => scheduler_error_response(e),
}
}
async fn rerun(
State(state): State<AdminState>,
Path(execution_id): Path<String>,
Json(req): Json<RerunRequest>,
) -> Response {
let spec = RerunSpec {
force_rerun: req.rerun_steps,
preserve: req.preserve_steps,
triggered_by: req.triggered_by,
};
match state.scheduler.rerun_steps(&execution_id, spec).await {
Ok(result) => {
let body = RerunResponse {
new_run_number: result.new_run_number,
effective_rerun: result.effective_rerun,
};
(StatusCode::OK, Json(body)).into_response()
}
Err(zart::error::SchedulerError::ExecutionNotFound(_)) => not_found(&execution_id),
Err(e) => scheduler_error_response(e),
}
}
async fn list_runs(State(state): State<AdminState>, Path(execution_id): Path<String>) -> Response {
match state.scheduler.list_runs(&execution_id).await {
Ok(runs) => {
let body: Vec<RunRecordResponse> = runs
.into_iter()
.map(|r| RunRecordResponse {
run_id: r.run_id,
execution_id: r.execution_id,
run_index: r.run_index,
payload: r.payload,
status: r.status.to_string(),
result: r.result,
started_at: r.started_at,
completed_at: r.completed_at,
trigger: format!("{:?}", r.trigger).to_lowercase(),
})
.collect();
(StatusCode::OK, Json(body)).into_response()
}
Err(zart::error::SchedulerError::ExecutionNotFound(_)) => not_found(&execution_id),
Err(e) => scheduler_error_response(e),
}
}
async fn create_pause(State(state): State<AdminState>, Json(req): Json<PauseRequest>) -> Response {
if let Some(ref expires_at) = req.expires_at
&& *expires_at <= chrono::Utc::now()
{
return (
StatusCode::UNPROCESSABLE_ENTITY,
Json(ErrorResponse {
error: "expiresAt must be in the future".into(),
}),
)
.into_response();
}
let scope = PauseScope {
execution_id: req.execution_id,
task_name: req.task_name,
step_pattern: req.step_pattern,
expires_at: req.expires_at,
triggered_by: req.triggered_by,
};
match state.scheduler.pause(scope).await {
Ok(rule) => {
let body = PauseRuleResponse {
rule_id: rule.rule_id,
execution_id: rule.scope.execution_id,
task_name: rule.scope.task_name,
step_pattern: rule.scope.step_pattern,
created_at: rule.created_at,
expires_at: rule.scope.expires_at,
created_by: rule.scope.triggered_by,
deleted_at: rule.deleted_at,
};
(StatusCode::CREATED, Json(body)).into_response()
}
Err(e) => scheduler_error_response(e),
}
}
async fn list_pauses(State(state): State<AdminState>) -> Response {
match state.scheduler.list_pause_rules(None).await {
Ok(rules) => {
let body: Vec<PauseRuleResponse> = rules
.into_iter()
.map(|r| PauseRuleResponse {
rule_id: r.rule_id,
execution_id: r.scope.execution_id,
task_name: r.scope.task_name,
step_pattern: r.scope.step_pattern,
created_at: r.created_at,
expires_at: r.scope.expires_at,
created_by: r.scope.triggered_by,
deleted_at: r.deleted_at,
})
.collect();
(StatusCode::OK, Json(body)).into_response()
}
Err(e) => scheduler_error_response(e),
}
}
async fn resume_rule(State(state): State<AdminState>, Path(rule_id): Path<String>) -> Response {
match state.scheduler.resume_rule_by_id(&rule_id, None).await {
Ok(true) => StatusCode::NO_CONTENT.into_response(),
Ok(false) => (
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: format!("pause rule '{rule_id}' not found"),
}),
)
.into_response(),
Err(e) => scheduler_error_response(e),
}
}
async fn delete_pause_rule(
State(state): State<AdminState>,
Path(rule_id): Path<String>,
) -> Response {
match state.scheduler.resume_rule_by_id(&rule_id, None).await {
Ok(true) => StatusCode::NO_CONTENT.into_response(),
Ok(false) => (
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: format!("pause rule '{rule_id}' not found"),
}),
)
.into_response(),
Err(e) => scheduler_error_response(e),
}
}
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct DetailQuery {
run_id: Option<String>,
}
async fn execution_detail(
State(state): State<AdminState>,
Path(execution_id): Path<String>,
Query(query): Query<DetailQuery>,
) -> Response {
match state
.scheduler
.execution_detail(&execution_id, query.run_id.as_deref())
.await
{
Ok(detail) => {
let execution: ExecutionResponse = detail.execution.into();
let runs: Vec<RunRecordResponse> = detail
.runs
.into_iter()
.map(|r| RunRecordResponse {
run_id: r.run_id,
execution_id: r.execution_id,
run_index: r.run_index,
payload: r.payload,
status: r.status.to_string(),
result: r.result,
started_at: r.started_at,
completed_at: r.completed_at,
trigger: format!("{:?}", r.trigger).to_lowercase(),
})
.collect();
let steps: Vec<StepDetailResponse> = detail
.steps
.into_iter()
.map(|s| {
let attempts: Vec<StepAttemptResponse> = s
.attempts
.into_iter()
.map(|a| StepAttemptResponse {
attempt_number: a.attempt_number,
status: format!("{:?}", a.status).to_lowercase(),
result: a.result,
error: a.error,
started_at: a.started_at,
completed_at: a.completed_at,
})
.collect();
StepDetailResponse {
step_id: s.step.step_id,
name: s.step.step_name,
kind: format!("{:?}", s.step.step_kind).to_lowercase(),
status: format!("{:?}", s.step.status).to_lowercase(),
retry_attempt: s.step.retry_attempt,
result: s.step.result,
last_error: s.step.last_error,
retryable: s.retryable,
scheduled_at: s.step.scheduled_at,
completed_at: s.step.completed_at,
attempts,
}
})
.collect();
let body = ExecutionDetailResponse {
execution,
runs,
steps,
};
(StatusCode::OK, Json(body)).into_response()
}
Err(zart::error::SchedulerError::ExecutionNotFound(_)) => not_found(&execution_id),
Err(e) => scheduler_error_response(e),
}
}
async fn get_current_run_id(scheduler: &DurableScheduler, execution_id: &str) -> Option<String> {
scheduler.get_current_run_id(execution_id).await.ok()?
}
fn not_found(execution_id: &str) -> Response {
(
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: format!("execution '{execution_id}' not found"),
}),
)
.into_response()
}
fn scheduler_error_response(e: zart::error::SchedulerError) -> Response {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
.into_response()
}