1use std::sync::Arc;
2
3use axum::extract::{Path, Query, State};
4use axum::routing::{get, post};
5use axum::{Json, Router};
6use serde::Deserialize;
7use utoipa::ToSchema;
8
9use crate::api::workflows::AppError;
10use crate::api::AppState;
11use crate::store::WorkflowStore;
12use crate::types::WorkflowSchedule;
13
14pub fn router<S: WorkflowStore + 'static>() -> Router<Arc<AppState<S>>> {
15 Router::new()
16 .route("/schedules", post(create_schedule).get(list_schedules))
17 .route(
18 "/schedules/{name}",
19 get(get_schedule).delete(delete_schedule),
20 )
21}
22
23#[derive(Deserialize, ToSchema)]
24pub struct CreateScheduleRequest {
25 pub name: String,
27 #[serde(default = "default_namespace")]
29 pub namespace: String,
30 pub workflow_type: String,
32 pub cron_expr: String,
34 pub input: Option<serde_json::Value>,
36 #[serde(default = "default_queue")]
38 pub task_queue: String,
39 #[serde(default = "default_overlap")]
41 pub overlap_policy: String,
42}
43
44fn default_queue() -> String {
45 "main".to_string()
46}
47
48fn default_namespace() -> String {
49 "main".to_string()
50}
51
52fn default_overlap() -> String {
53 "skip".to_string()
54}
55
56#[utoipa::path(
57 post, path = "/api/v1/schedules",
58 tag = "schedules",
59 request_body = CreateScheduleRequest,
60 responses(
61 (status = 201, description = "Schedule created", body = WorkflowSchedule),
62 (status = 500, description = "Internal error"),
63 ),
64)]
65pub async fn create_schedule<S: WorkflowStore>(
66 State(state): State<Arc<AppState<S>>>,
67 Json(req): Json<CreateScheduleRequest>,
68) -> Result<(axum::http::StatusCode, Json<serde_json::Value>), AppError> {
69 let now = timestamp_now();
70
71 let schedule = WorkflowSchedule {
72 name: req.name.clone(),
73 namespace: req.namespace.clone(),
74 workflow_type: req.workflow_type,
75 cron_expr: req.cron_expr,
76 input: req.input.map(|v| v.to_string()),
77 task_queue: req.task_queue,
78 overlap_policy: req.overlap_policy,
79 paused: false,
80 last_run_at: None,
81 next_run_at: None,
82 last_workflow_id: None,
83 created_at: now,
84 };
85
86 state.engine.create_schedule(&schedule).await?;
87
88 Ok((
89 axum::http::StatusCode::CREATED,
90 Json(serde_json::to_value(schedule)?),
91 ))
92}
93
94#[derive(Deserialize)]
95pub struct NsQuery {
96 #[serde(default = "default_namespace")]
97 namespace: String,
98}
99
100#[utoipa::path(
101 get, path = "/api/v1/schedules",
102 tag = "schedules",
103 params(("namespace" = Option<String>, Query, description = "Namespace (default: main)")),
104 responses((status = 200, description = "List of schedules", body = Vec<WorkflowSchedule>)),
105)]
106pub async fn list_schedules<S: WorkflowStore>(
107 State(state): State<Arc<AppState<S>>>,
108 Query(q): Query<NsQuery>,
109) -> Result<Json<Vec<serde_json::Value>>, AppError> {
110 let schedules = state.engine.list_schedules(&q.namespace).await?;
111 let json: Vec<serde_json::Value> = schedules
112 .into_iter()
113 .map(|s| serde_json::to_value(s).unwrap_or_default())
114 .collect();
115 Ok(Json(json))
116}
117
118#[utoipa::path(
119 get, path = "/api/v1/schedules/{name}",
120 tag = "schedules",
121 params(("name" = String, Path, description = "Schedule name")),
122 responses(
123 (status = 200, description = "Schedule details", body = WorkflowSchedule),
124 (status = 404, description = "Schedule not found"),
125 ),
126)]
127pub async fn get_schedule<S: WorkflowStore>(
128 State(state): State<Arc<AppState<S>>>,
129 Path(name): Path<String>,
130 Query(q): Query<NsQuery>,
131) -> Result<Json<serde_json::Value>, AppError> {
132 let schedule = state
133 .engine
134 .get_schedule(&q.namespace, &name)
135 .await?
136 .ok_or(AppError::NotFound(format!("schedule {name}")))?;
137
138 Ok(Json(serde_json::to_value(schedule)?))
139}
140
141#[utoipa::path(
142 delete, path = "/api/v1/schedules/{name}",
143 tag = "schedules",
144 params(("name" = String, Path, description = "Schedule name")),
145 responses(
146 (status = 200, description = "Schedule deleted"),
147 (status = 404, description = "Schedule not found"),
148 ),
149)]
150pub async fn delete_schedule<S: WorkflowStore>(
151 State(state): State<Arc<AppState<S>>>,
152 Path(name): Path<String>,
153 Query(q): Query<NsQuery>,
154) -> Result<axum::http::StatusCode, AppError> {
155 let deleted = state.engine.delete_schedule(&q.namespace, &name).await?;
156 if deleted {
157 Ok(axum::http::StatusCode::OK)
158 } else {
159 Err(AppError::NotFound(format!("schedule {name}")))
160 }
161}
162
163fn timestamp_now() -> f64 {
164 std::time::SystemTime::now()
165 .duration_since(std::time::UNIX_EPOCH)
166 .unwrap()
167 .as_secs_f64()
168}