Skip to main content

assay_workflow/api/
schedules.rs

1use std::sync::Arc;
2
3use axum::extract::{Path, Query, State};
4use axum::routing::{get, post};
5use axum::{Json, Router};
6use serde::Deserialize;
7use utoipa::ToSchema;
8
9use crate::api::workflows::AppError;
10use crate::api::AppState;
11use crate::store::WorkflowStore;
12use crate::types::{SchedulePatch, WorkflowSchedule};
13
14pub fn router<S: WorkflowStore + 'static>() -> Router<Arc<AppState<S>>> {
15    Router::new()
16        .route("/schedules", post(create_schedule).get(list_schedules))
17        .route(
18            "/schedules/{name}",
19            get(get_schedule)
20                .patch(patch_schedule)
21                .delete(delete_schedule),
22        )
23        .route("/schedules/{name}/pause", post(pause_schedule))
24        .route("/schedules/{name}/resume", post(resume_schedule))
25}
26
27#[derive(Deserialize, ToSchema)]
28pub struct CreateScheduleRequest {
29    /// Unique schedule name
30    pub name: String,
31    /// Namespace (default: "main")
32    #[serde(default = "default_namespace")]
33    pub namespace: String,
34    /// Workflow type to start on each trigger
35    pub workflow_type: String,
36    /// Cron expression (e.g. "0 * * * *" for hourly)
37    pub cron_expr: String,
38    /// IANA time-zone name used to interpret `cron_expr`
39    /// (e.g. "Europe/Berlin", "America/New_York"). Default: "UTC".
40    #[serde(default = "default_timezone")]
41    pub timezone: String,
42    /// Optional JSON input passed to each workflow run
43    pub input: Option<serde_json::Value>,
44    /// Task queue for created workflows (default: "main")
45    #[serde(default = "default_queue")]
46    pub task_queue: String,
47    /// Overlap policy: skip, queue, cancel_old, allow_all (default: "skip")
48    #[serde(default = "default_overlap")]
49    pub overlap_policy: String,
50}
51
52fn default_queue() -> String {
53    "main".to_string()
54}
55
56fn default_namespace() -> String {
57    "main".to_string()
58}
59
60fn default_overlap() -> String {
61    "skip".to_string()
62}
63
64fn default_timezone() -> String {
65    "UTC".to_string()
66}
67
68#[utoipa::path(
69    post, path = "/api/v1/schedules",
70    tag = "schedules",
71    request_body = CreateScheduleRequest,
72    responses(
73        (status = 201, description = "Schedule created", body = WorkflowSchedule),
74        (status = 500, description = "Internal error"),
75    ),
76)]
77pub async fn create_schedule<S: WorkflowStore>(
78    State(state): State<Arc<AppState<S>>>,
79    Json(req): Json<CreateScheduleRequest>,
80) -> Result<(axum::http::StatusCode, Json<serde_json::Value>), AppError> {
81    let now = timestamp_now();
82
83    // Validate the timezone early so a bad value produces a clean 400
84    // instead of a mysterious silent no-op from the scheduler later.
85    if !req.timezone.eq_ignore_ascii_case("UTC")
86        && req.timezone.parse::<chrono_tz::Tz>().is_err()
87    {
88        return Err(AppError::Internal(anyhow::anyhow!(
89            "invalid timezone: {}",
90            req.timezone
91        )));
92    }
93
94    let schedule = WorkflowSchedule {
95        name: req.name.clone(),
96        namespace: req.namespace.clone(),
97        workflow_type: req.workflow_type,
98        cron_expr: req.cron_expr,
99        timezone: req.timezone,
100        input: req.input.map(|v| v.to_string()),
101        task_queue: req.task_queue,
102        overlap_policy: req.overlap_policy,
103        paused: false,
104        last_run_at: None,
105        next_run_at: None,
106        last_workflow_id: None,
107        created_at: now,
108    };
109
110    state.engine.create_schedule(&schedule).await?;
111
112    Ok((
113        axum::http::StatusCode::CREATED,
114        Json(serde_json::to_value(schedule)?),
115    ))
116}
117
118#[derive(Deserialize)]
119pub struct NsQuery {
120    #[serde(default = "default_namespace")]
121    namespace: String,
122}
123
124#[utoipa::path(
125    get, path = "/api/v1/schedules",
126    tag = "schedules",
127    params(("namespace" = Option<String>, Query, description = "Namespace (default: main)")),
128    responses((status = 200, description = "List of schedules", body = Vec<WorkflowSchedule>)),
129)]
130pub async fn list_schedules<S: WorkflowStore>(
131    State(state): State<Arc<AppState<S>>>,
132    Query(q): Query<NsQuery>,
133) -> Result<Json<Vec<serde_json::Value>>, AppError> {
134    let schedules = state.engine.list_schedules(&q.namespace).await?;
135    let json: Vec<serde_json::Value> = schedules
136        .into_iter()
137        .map(|s| serde_json::to_value(s).unwrap_or_default())
138        .collect();
139    Ok(Json(json))
140}
141
142#[utoipa::path(
143    get, path = "/api/v1/schedules/{name}",
144    tag = "schedules",
145    params(("name" = String, Path, description = "Schedule name")),
146    responses(
147        (status = 200, description = "Schedule details", body = WorkflowSchedule),
148        (status = 404, description = "Schedule not found"),
149    ),
150)]
151pub async fn get_schedule<S: WorkflowStore>(
152    State(state): State<Arc<AppState<S>>>,
153    Path(name): Path<String>,
154    Query(q): Query<NsQuery>,
155) -> Result<Json<serde_json::Value>, AppError> {
156    let schedule = state
157        .engine
158        .get_schedule(&q.namespace, &name)
159        .await?
160        .ok_or(AppError::NotFound(format!("schedule {name}")))?;
161
162    Ok(Json(serde_json::to_value(schedule)?))
163}
164
165#[utoipa::path(
166    delete, path = "/api/v1/schedules/{name}",
167    tag = "schedules",
168    params(("name" = String, Path, description = "Schedule name")),
169    responses(
170        (status = 200, description = "Schedule deleted"),
171        (status = 404, description = "Schedule not found"),
172    ),
173)]
174pub async fn delete_schedule<S: WorkflowStore>(
175    State(state): State<Arc<AppState<S>>>,
176    Path(name): Path<String>,
177    Query(q): Query<NsQuery>,
178) -> Result<axum::http::StatusCode, AppError> {
179    let deleted = state.engine.delete_schedule(&q.namespace, &name).await?;
180    if deleted {
181        Ok(axum::http::StatusCode::OK)
182    } else {
183        Err(AppError::NotFound(format!("schedule {name}")))
184    }
185}
186
187#[derive(Deserialize, ToSchema)]
188pub struct PatchScheduleRequest {
189    /// New cron expression (leave null to keep the existing one).
190    pub cron_expr: Option<String>,
191    /// New IANA timezone (e.g. "Europe/Berlin"; leave null to keep).
192    pub timezone: Option<String>,
193    /// New JSON input passed to each workflow run. Send `null` literally
194    /// to preserve; use `{}` to pass an empty object.
195    pub input: Option<serde_json::Value>,
196    /// New task queue for created workflows.
197    pub task_queue: Option<String>,
198    /// New overlap policy (skip, queue, cancel_old, allow_all).
199    pub overlap_policy: Option<String>,
200}
201
202#[utoipa::path(
203    patch, path = "/api/v1/schedules/{name}",
204    tag = "schedules",
205    params(
206        ("name" = String, Path, description = "Schedule name"),
207        ("namespace" = Option<String>, Query, description = "Namespace (default: main)"),
208    ),
209    request_body = PatchScheduleRequest,
210    responses(
211        (status = 200, description = "Schedule updated", body = WorkflowSchedule),
212        (status = 404, description = "Schedule not found"),
213    ),
214)]
215pub async fn patch_schedule<S: WorkflowStore>(
216    State(state): State<Arc<AppState<S>>>,
217    Path(name): Path<String>,
218    Query(q): Query<NsQuery>,
219    Json(req): Json<PatchScheduleRequest>,
220) -> Result<Json<serde_json::Value>, AppError> {
221    // Validate timezone before committing the write — same as create.
222    if let Some(ref tz) = req.timezone
223        && !tz.eq_ignore_ascii_case("UTC")
224        && tz.parse::<chrono_tz::Tz>().is_err()
225    {
226        return Err(AppError::Internal(anyhow::anyhow!(
227            "invalid timezone: {tz}"
228        )));
229    }
230
231    let patch = SchedulePatch {
232        cron_expr: req.cron_expr,
233        timezone: req.timezone,
234        input: req.input,
235        task_queue: req.task_queue,
236        overlap_policy: req.overlap_policy,
237    };
238
239    let updated = state
240        .engine
241        .update_schedule(&q.namespace, &name, &patch)
242        .await?
243        .ok_or_else(|| AppError::NotFound(format!("schedule {name}")))?;
244
245    Ok(Json(serde_json::to_value(updated)?))
246}
247
248#[utoipa::path(
249    post, path = "/api/v1/schedules/{name}/pause",
250    tag = "schedules",
251    params(
252        ("name" = String, Path, description = "Schedule name"),
253        ("namespace" = Option<String>, Query, description = "Namespace (default: main)"),
254    ),
255    responses(
256        (status = 200, description = "Schedule paused", body = WorkflowSchedule),
257        (status = 404, description = "Schedule not found"),
258    ),
259)]
260pub async fn pause_schedule<S: WorkflowStore>(
261    State(state): State<Arc<AppState<S>>>,
262    Path(name): Path<String>,
263    Query(q): Query<NsQuery>,
264) -> Result<Json<serde_json::Value>, AppError> {
265    let updated = state
266        .engine
267        .set_schedule_paused(&q.namespace, &name, true)
268        .await?
269        .ok_or_else(|| AppError::NotFound(format!("schedule {name}")))?;
270    Ok(Json(serde_json::to_value(updated)?))
271}
272
273#[utoipa::path(
274    post, path = "/api/v1/schedules/{name}/resume",
275    tag = "schedules",
276    params(
277        ("name" = String, Path, description = "Schedule name"),
278        ("namespace" = Option<String>, Query, description = "Namespace (default: main)"),
279    ),
280    responses(
281        (status = 200, description = "Schedule resumed", body = WorkflowSchedule),
282        (status = 404, description = "Schedule not found"),
283    ),
284)]
285pub async fn resume_schedule<S: WorkflowStore>(
286    State(state): State<Arc<AppState<S>>>,
287    Path(name): Path<String>,
288    Query(q): Query<NsQuery>,
289) -> Result<Json<serde_json::Value>, AppError> {
290    let updated = state
291        .engine
292        .set_schedule_paused(&q.namespace, &name, false)
293        .await?
294        .ok_or_else(|| AppError::NotFound(format!("schedule {name}")))?;
295    Ok(Json(serde_json::to_value(updated)?))
296}
297
298fn timestamp_now() -> f64 {
299    std::time::SystemTime::now()
300        .duration_since(std::time::UNIX_EPOCH)
301        .unwrap()
302        .as_secs_f64()
303}