1use std::sync::Arc;
2
3use axum::extract::{Path, Query, State};
4use axum::routing::{get, post};
5use axum::{Json, Router};
6use serde::Deserialize;
7use utoipa::ToSchema;
8
9use crate::api::workflows::AppError;
10use crate::api::AppState;
11use crate::store::WorkflowStore;
12use crate::types::{SchedulePatch, WorkflowSchedule};
13
14pub fn router<S: WorkflowStore + 'static>() -> Router<Arc<AppState<S>>> {
15 Router::new()
16 .route("/schedules", post(create_schedule).get(list_schedules))
17 .route(
18 "/schedules/{name}",
19 get(get_schedule)
20 .patch(patch_schedule)
21 .delete(delete_schedule),
22 )
23 .route("/schedules/{name}/pause", post(pause_schedule))
24 .route("/schedules/{name}/resume", post(resume_schedule))
25}
26
27#[derive(Deserialize, ToSchema)]
28pub struct CreateScheduleRequest {
29 pub name: String,
31 #[serde(default = "default_namespace")]
33 pub namespace: String,
34 pub workflow_type: String,
36 pub cron_expr: String,
38 #[serde(default = "default_timezone")]
41 pub timezone: String,
42 pub input: Option<serde_json::Value>,
44 #[serde(default = "default_queue")]
46 pub task_queue: String,
47 #[serde(default = "default_overlap")]
49 pub overlap_policy: String,
50}
51
52fn default_queue() -> String {
53 "main".to_string()
54}
55
56fn default_namespace() -> String {
57 "main".to_string()
58}
59
60fn default_overlap() -> String {
61 "skip".to_string()
62}
63
64fn default_timezone() -> String {
65 "UTC".to_string()
66}
67
68#[utoipa::path(
69 post, path = "/api/v1/schedules",
70 tag = "schedules",
71 request_body = CreateScheduleRequest,
72 responses(
73 (status = 201, description = "Schedule created", body = WorkflowSchedule),
74 (status = 500, description = "Internal error"),
75 ),
76)]
77pub async fn create_schedule<S: WorkflowStore>(
78 State(state): State<Arc<AppState<S>>>,
79 Json(req): Json<CreateScheduleRequest>,
80) -> Result<(axum::http::StatusCode, Json<serde_json::Value>), AppError> {
81 let now = timestamp_now();
82
83 if !req.timezone.eq_ignore_ascii_case("UTC")
86 && req.timezone.parse::<chrono_tz::Tz>().is_err()
87 {
88 return Err(AppError::Internal(anyhow::anyhow!(
89 "invalid timezone: {}",
90 req.timezone
91 )));
92 }
93
94 let schedule = WorkflowSchedule {
95 name: req.name.clone(),
96 namespace: req.namespace.clone(),
97 workflow_type: req.workflow_type,
98 cron_expr: req.cron_expr,
99 timezone: req.timezone,
100 input: req.input.map(|v| v.to_string()),
101 task_queue: req.task_queue,
102 overlap_policy: req.overlap_policy,
103 paused: false,
104 last_run_at: None,
105 next_run_at: None,
106 last_workflow_id: None,
107 created_at: now,
108 };
109
110 state.engine.create_schedule(&schedule).await?;
111
112 Ok((
113 axum::http::StatusCode::CREATED,
114 Json(serde_json::to_value(schedule)?),
115 ))
116}
117
118#[derive(Deserialize)]
119pub struct NsQuery {
120 #[serde(default = "default_namespace")]
121 namespace: String,
122}
123
124#[utoipa::path(
125 get, path = "/api/v1/schedules",
126 tag = "schedules",
127 params(("namespace" = Option<String>, Query, description = "Namespace (default: main)")),
128 responses((status = 200, description = "List of schedules", body = Vec<WorkflowSchedule>)),
129)]
130pub async fn list_schedules<S: WorkflowStore>(
131 State(state): State<Arc<AppState<S>>>,
132 Query(q): Query<NsQuery>,
133) -> Result<Json<Vec<serde_json::Value>>, AppError> {
134 let schedules = state.engine.list_schedules(&q.namespace).await?;
135 let json: Vec<serde_json::Value> = schedules
136 .into_iter()
137 .map(|s| serde_json::to_value(s).unwrap_or_default())
138 .collect();
139 Ok(Json(json))
140}
141
142#[utoipa::path(
143 get, path = "/api/v1/schedules/{name}",
144 tag = "schedules",
145 params(("name" = String, Path, description = "Schedule name")),
146 responses(
147 (status = 200, description = "Schedule details", body = WorkflowSchedule),
148 (status = 404, description = "Schedule not found"),
149 ),
150)]
151pub async fn get_schedule<S: WorkflowStore>(
152 State(state): State<Arc<AppState<S>>>,
153 Path(name): Path<String>,
154 Query(q): Query<NsQuery>,
155) -> Result<Json<serde_json::Value>, AppError> {
156 let schedule = state
157 .engine
158 .get_schedule(&q.namespace, &name)
159 .await?
160 .ok_or(AppError::NotFound(format!("schedule {name}")))?;
161
162 Ok(Json(serde_json::to_value(schedule)?))
163}
164
165#[utoipa::path(
166 delete, path = "/api/v1/schedules/{name}",
167 tag = "schedules",
168 params(("name" = String, Path, description = "Schedule name")),
169 responses(
170 (status = 200, description = "Schedule deleted"),
171 (status = 404, description = "Schedule not found"),
172 ),
173)]
174pub async fn delete_schedule<S: WorkflowStore>(
175 State(state): State<Arc<AppState<S>>>,
176 Path(name): Path<String>,
177 Query(q): Query<NsQuery>,
178) -> Result<axum::http::StatusCode, AppError> {
179 let deleted = state.engine.delete_schedule(&q.namespace, &name).await?;
180 if deleted {
181 Ok(axum::http::StatusCode::OK)
182 } else {
183 Err(AppError::NotFound(format!("schedule {name}")))
184 }
185}
186
187#[derive(Deserialize, ToSchema)]
188pub struct PatchScheduleRequest {
189 pub cron_expr: Option<String>,
191 pub timezone: Option<String>,
193 pub input: Option<serde_json::Value>,
196 pub task_queue: Option<String>,
198 pub overlap_policy: Option<String>,
200}
201
202#[utoipa::path(
203 patch, path = "/api/v1/schedules/{name}",
204 tag = "schedules",
205 params(
206 ("name" = String, Path, description = "Schedule name"),
207 ("namespace" = Option<String>, Query, description = "Namespace (default: main)"),
208 ),
209 request_body = PatchScheduleRequest,
210 responses(
211 (status = 200, description = "Schedule updated", body = WorkflowSchedule),
212 (status = 404, description = "Schedule not found"),
213 ),
214)]
215pub async fn patch_schedule<S: WorkflowStore>(
216 State(state): State<Arc<AppState<S>>>,
217 Path(name): Path<String>,
218 Query(q): Query<NsQuery>,
219 Json(req): Json<PatchScheduleRequest>,
220) -> Result<Json<serde_json::Value>, AppError> {
221 if let Some(ref tz) = req.timezone
223 && !tz.eq_ignore_ascii_case("UTC")
224 && tz.parse::<chrono_tz::Tz>().is_err()
225 {
226 return Err(AppError::Internal(anyhow::anyhow!(
227 "invalid timezone: {tz}"
228 )));
229 }
230
231 let patch = SchedulePatch {
232 cron_expr: req.cron_expr,
233 timezone: req.timezone,
234 input: req.input,
235 task_queue: req.task_queue,
236 overlap_policy: req.overlap_policy,
237 };
238
239 let updated = state
240 .engine
241 .update_schedule(&q.namespace, &name, &patch)
242 .await?
243 .ok_or_else(|| AppError::NotFound(format!("schedule {name}")))?;
244
245 Ok(Json(serde_json::to_value(updated)?))
246}
247
248#[utoipa::path(
249 post, path = "/api/v1/schedules/{name}/pause",
250 tag = "schedules",
251 params(
252 ("name" = String, Path, description = "Schedule name"),
253 ("namespace" = Option<String>, Query, description = "Namespace (default: main)"),
254 ),
255 responses(
256 (status = 200, description = "Schedule paused", body = WorkflowSchedule),
257 (status = 404, description = "Schedule not found"),
258 ),
259)]
260pub async fn pause_schedule<S: WorkflowStore>(
261 State(state): State<Arc<AppState<S>>>,
262 Path(name): Path<String>,
263 Query(q): Query<NsQuery>,
264) -> Result<Json<serde_json::Value>, AppError> {
265 let updated = state
266 .engine
267 .set_schedule_paused(&q.namespace, &name, true)
268 .await?
269 .ok_or_else(|| AppError::NotFound(format!("schedule {name}")))?;
270 Ok(Json(serde_json::to_value(updated)?))
271}
272
273#[utoipa::path(
274 post, path = "/api/v1/schedules/{name}/resume",
275 tag = "schedules",
276 params(
277 ("name" = String, Path, description = "Schedule name"),
278 ("namespace" = Option<String>, Query, description = "Namespace (default: main)"),
279 ),
280 responses(
281 (status = 200, description = "Schedule resumed", body = WorkflowSchedule),
282 (status = 404, description = "Schedule not found"),
283 ),
284)]
285pub async fn resume_schedule<S: WorkflowStore>(
286 State(state): State<Arc<AppState<S>>>,
287 Path(name): Path<String>,
288 Query(q): Query<NsQuery>,
289) -> Result<Json<serde_json::Value>, AppError> {
290 let updated = state
291 .engine
292 .set_schedule_paused(&q.namespace, &name, false)
293 .await?
294 .ok_or_else(|| AppError::NotFound(format!("schedule {name}")))?;
295 Ok(Json(serde_json::to_value(updated)?))
296}
297
298fn timestamp_now() -> f64 {
299 std::time::SystemTime::now()
300 .duration_since(std::time::UNIX_EPOCH)
301 .unwrap()
302 .as_secs_f64()
303}