Skip to main content

assay_workflow/api/
schedules.rs

1use std::sync::Arc;
2
3use axum::extract::{Path, Query, State};
4use axum::routing::{get, post};
5use axum::{Json, Router};
6use serde::Deserialize;
7use utoipa::ToSchema;
8
9use crate::api::workflows::AppError;
10use crate::api::AppState;
11use crate::store::WorkflowStore;
12use crate::types::WorkflowSchedule;
13
14pub fn router<S: WorkflowStore + 'static>() -> Router<Arc<AppState<S>>> {
15    Router::new()
16        .route("/schedules", post(create_schedule).get(list_schedules))
17        .route(
18            "/schedules/{name}",
19            get(get_schedule).delete(delete_schedule),
20        )
21}
22
23#[derive(Deserialize, ToSchema)]
24pub struct CreateScheduleRequest {
25    /// Unique schedule name
26    pub name: String,
27    /// Namespace (default: "main")
28    #[serde(default = "default_namespace")]
29    pub namespace: String,
30    /// Workflow type to start on each trigger
31    pub workflow_type: String,
32    /// Cron expression (e.g. "0 * * * *" for hourly)
33    pub cron_expr: String,
34    /// Optional JSON input passed to each workflow run
35    pub input: Option<serde_json::Value>,
36    /// Task queue for created workflows (default: "main")
37    #[serde(default = "default_queue")]
38    pub task_queue: String,
39    /// Overlap policy: skip, queue, cancel_old, allow_all (default: "skip")
40    #[serde(default = "default_overlap")]
41    pub overlap_policy: String,
42}
43
44fn default_queue() -> String {
45    "main".to_string()
46}
47
48fn default_namespace() -> String {
49    "main".to_string()
50}
51
52fn default_overlap() -> String {
53    "skip".to_string()
54}
55
56#[utoipa::path(
57    post, path = "/api/v1/schedules",
58    tag = "schedules",
59    request_body = CreateScheduleRequest,
60    responses(
61        (status = 201, description = "Schedule created", body = WorkflowSchedule),
62        (status = 500, description = "Internal error"),
63    ),
64)]
65pub async fn create_schedule<S: WorkflowStore>(
66    State(state): State<Arc<AppState<S>>>,
67    Json(req): Json<CreateScheduleRequest>,
68) -> Result<(axum::http::StatusCode, Json<serde_json::Value>), AppError> {
69    let now = timestamp_now();
70
71    let schedule = WorkflowSchedule {
72        name: req.name.clone(),
73        namespace: req.namespace.clone(),
74        workflow_type: req.workflow_type,
75        cron_expr: req.cron_expr,
76        input: req.input.map(|v| v.to_string()),
77        task_queue: req.task_queue,
78        overlap_policy: req.overlap_policy,
79        paused: false,
80        last_run_at: None,
81        next_run_at: None,
82        last_workflow_id: None,
83        created_at: now,
84    };
85
86    state.engine.create_schedule(&schedule).await?;
87
88    Ok((
89        axum::http::StatusCode::CREATED,
90        Json(serde_json::to_value(schedule)?),
91    ))
92}
93
94#[derive(Deserialize)]
95pub struct NsQuery {
96    #[serde(default = "default_namespace")]
97    namespace: String,
98}
99
100#[utoipa::path(
101    get, path = "/api/v1/schedules",
102    tag = "schedules",
103    params(("namespace" = Option<String>, Query, description = "Namespace (default: main)")),
104    responses((status = 200, description = "List of schedules", body = Vec<WorkflowSchedule>)),
105)]
106pub async fn list_schedules<S: WorkflowStore>(
107    State(state): State<Arc<AppState<S>>>,
108    Query(q): Query<NsQuery>,
109) -> Result<Json<Vec<serde_json::Value>>, AppError> {
110    let schedules = state.engine.list_schedules(&q.namespace).await?;
111    let json: Vec<serde_json::Value> = schedules
112        .into_iter()
113        .map(|s| serde_json::to_value(s).unwrap_or_default())
114        .collect();
115    Ok(Json(json))
116}
117
118#[utoipa::path(
119    get, path = "/api/v1/schedules/{name}",
120    tag = "schedules",
121    params(("name" = String, Path, description = "Schedule name")),
122    responses(
123        (status = 200, description = "Schedule details", body = WorkflowSchedule),
124        (status = 404, description = "Schedule not found"),
125    ),
126)]
127pub async fn get_schedule<S: WorkflowStore>(
128    State(state): State<Arc<AppState<S>>>,
129    Path(name): Path<String>,
130    Query(q): Query<NsQuery>,
131) -> Result<Json<serde_json::Value>, AppError> {
132    let schedule = state
133        .engine
134        .get_schedule(&q.namespace, &name)
135        .await?
136        .ok_or(AppError::NotFound(format!("schedule {name}")))?;
137
138    Ok(Json(serde_json::to_value(schedule)?))
139}
140
141#[utoipa::path(
142    delete, path = "/api/v1/schedules/{name}",
143    tag = "schedules",
144    params(("name" = String, Path, description = "Schedule name")),
145    responses(
146        (status = 200, description = "Schedule deleted"),
147        (status = 404, description = "Schedule not found"),
148    ),
149)]
150pub async fn delete_schedule<S: WorkflowStore>(
151    State(state): State<Arc<AppState<S>>>,
152    Path(name): Path<String>,
153    Query(q): Query<NsQuery>,
154) -> Result<axum::http::StatusCode, AppError> {
155    let deleted = state.engine.delete_schedule(&q.namespace, &name).await?;
156    if deleted {
157        Ok(axum::http::StatusCode::OK)
158    } else {
159        Err(AppError::NotFound(format!("schedule {name}")))
160    }
161}
162
163fn timestamp_now() -> f64 {
164    std::time::SystemTime::now()
165        .duration_since(std::time::UNIX_EPOCH)
166        .unwrap()
167        .as_secs_f64()
168}