1use std::sync::Arc;
2
3use axum::extract::{Path, Query, State};
4use axum::routing::{get, post};
5use axum::{Json, Router};
6use serde::{Deserialize, Serialize};
7use utoipa::ToSchema;
8
9use crate::api::AppState;
10use crate::store::WorkflowStore;
11use crate::types::WorkflowStatus;
12
13pub fn router<S: WorkflowStore + 'static>() -> Router<Arc<AppState<S>>> {
14 Router::new()
15 .route("/workflows", post(start_workflow).get(list_workflows))
16 .route("/workflows/{id}", get(describe_workflow))
17 .route("/workflows/{id}/events", get(get_events))
18 .route("/workflows/{id}/signal/{name}", post(send_signal))
19 .route("/workflows/{id}/cancel", post(cancel_workflow))
20 .route("/workflows/{id}/terminate", post(terminate_workflow))
21 .route("/workflows/{id}/children", get(list_children))
22 .route("/workflows/{id}/continue-as-new", post(continue_as_new))
23}
24
25#[derive(Deserialize, ToSchema)]
26pub struct StartWorkflowRequest {
27 pub namespace: Option<String>,
29 pub workflow_type: String,
31 pub workflow_id: String,
33 pub input: Option<serde_json::Value>,
35 #[serde(default = "default_queue")]
37 pub task_queue: String,
38}
39
40fn default_queue() -> String {
41 "main".to_string()
42}
43
44#[derive(Serialize, ToSchema)]
45pub struct WorkflowResponse {
46 pub workflow_id: String,
47 pub run_id: String,
48 pub status: String,
49}
50
51#[utoipa::path(
52 post, path = "/api/v1/workflows",
53 tag = "workflows",
54 request_body = StartWorkflowRequest,
55 responses(
56 (status = 201, description = "Workflow started", body = WorkflowResponse),
57 (status = 500, description = "Internal error"),
58 ),
59)]
60pub async fn start_workflow<S: WorkflowStore>(
61 State(state): State<Arc<AppState<S>>>,
62 Json(req): Json<StartWorkflowRequest>,
63) -> Result<(axum::http::StatusCode, Json<WorkflowResponse>), AppError> {
64 let input = req.input.map(|v| v.to_string());
65 let namespace = req.namespace.as_deref().unwrap_or("main");
66 let wf = state
67 .engine
68 .start_workflow(
69 namespace,
70 &req.workflow_type,
71 &req.workflow_id,
72 input.as_deref(),
73 &req.task_queue,
74 )
75 .await?;
76
77 Ok((
78 axum::http::StatusCode::CREATED,
79 Json(WorkflowResponse {
80 workflow_id: wf.id,
81 run_id: wf.run_id,
82 status: wf.status,
83 }),
84 ))
85}
86
87#[derive(Deserialize)]
88pub struct ListQuery {
89 #[serde(default = "default_namespace")]
90 pub namespace: String,
91 pub status: Option<String>,
92 #[serde(rename = "type")]
93 pub workflow_type: Option<String>,
94 #[serde(default = "default_limit")]
95 pub limit: i64,
96 #[serde(default)]
97 pub offset: i64,
98}
99
100fn default_namespace() -> String {
101 "main".to_string()
102}
103
104fn default_limit() -> i64 {
105 50
106}
107
108#[utoipa::path(
109 get, path = "/api/v1/workflows",
110 tag = "workflows",
111 params(
112 ("status" = Option<String>, Query, description = "Filter by status"),
113 ("type" = Option<String>, Query, description = "Filter by workflow type"),
114 ("limit" = Option<i64>, Query, description = "Max results (default 50)"),
115 ("offset" = Option<i64>, Query, description = "Pagination offset"),
116 ),
117 responses(
118 (status = 200, description = "List of workflows", body = Vec<WorkflowRecord>),
119 ),
120)]
121pub async fn list_workflows<S: WorkflowStore>(
122 State(state): State<Arc<AppState<S>>>,
123 Query(q): Query<ListQuery>,
124) -> Result<Json<Vec<serde_json::Value>>, AppError> {
125 let status = q
126 .status
127 .as_deref()
128 .and_then(|s| s.parse::<WorkflowStatus>().ok());
129
130 let workflows = state
131 .engine
132 .list_workflows(&q.namespace, status, q.workflow_type.as_deref(), q.limit, q.offset)
133 .await?;
134
135 let json: Vec<serde_json::Value> = workflows
136 .into_iter()
137 .map(|w| serde_json::to_value(w).unwrap_or_default())
138 .collect();
139
140 Ok(Json(json))
141}
142
143#[utoipa::path(
144 get, path = "/api/v1/workflows/{id}",
145 tag = "workflows",
146 params(("id" = String, Path, description = "Workflow ID")),
147 responses(
148 (status = 200, description = "Workflow details", body = WorkflowRecord),
149 (status = 404, description = "Workflow not found"),
150 ),
151)]
152pub async fn describe_workflow<S: WorkflowStore>(
153 State(state): State<Arc<AppState<S>>>,
154 Path(id): Path<String>,
155) -> Result<Json<serde_json::Value>, AppError> {
156 let wf = state
157 .engine
158 .get_workflow(&id)
159 .await?
160 .ok_or(AppError::NotFound(format!("workflow {id}")))?;
161
162 Ok(Json(serde_json::to_value(wf)?))
163}
164
165#[utoipa::path(
166 get, path = "/api/v1/workflows/{id}/events",
167 tag = "workflows",
168 params(("id" = String, Path, description = "Workflow ID")),
169 responses(
170 (status = 200, description = "Event history", body = Vec<WorkflowEvent>),
171 ),
172)]
173pub async fn get_events<S: WorkflowStore>(
174 State(state): State<Arc<AppState<S>>>,
175 Path(id): Path<String>,
176) -> Result<Json<Vec<serde_json::Value>>, AppError> {
177 let events = state.engine.get_events(&id).await?;
178 let json: Vec<serde_json::Value> = events
179 .into_iter()
180 .map(|e| serde_json::to_value(e).unwrap_or_default())
181 .collect();
182 Ok(Json(json))
183}
184
185#[derive(Deserialize, ToSchema)]
186pub struct SignalBody {
187 pub payload: Option<serde_json::Value>,
188}
189
190#[utoipa::path(
191 post, path = "/api/v1/workflows/{id}/signal/{name}",
192 tag = "workflows",
193 params(
194 ("id" = String, Path, description = "Workflow ID"),
195 ("name" = String, Path, description = "Signal name"),
196 ),
197 responses(
198 (status = 200, description = "Signal sent"),
199 ),
200)]
201pub async fn send_signal<S: WorkflowStore>(
202 State(state): State<Arc<AppState<S>>>,
203 Path((id, name)): Path<(String, String)>,
204 Json(body): Json<Option<SignalBody>>,
205) -> Result<axum::http::StatusCode, AppError> {
206 let payload = body.and_then(|b| b.payload).map(|v| v.to_string());
207 state
208 .engine
209 .send_signal(&id, &name, payload.as_deref())
210 .await?;
211 Ok(axum::http::StatusCode::OK)
212}
213
214#[utoipa::path(
215 post, path = "/api/v1/workflows/{id}/cancel",
216 tag = "workflows",
217 params(("id" = String, Path, description = "Workflow ID")),
218 responses(
219 (status = 200, description = "Workflow cancelled"),
220 (status = 404, description = "Workflow not found or already terminal"),
221 ),
222)]
223pub async fn cancel_workflow<S: WorkflowStore>(
224 State(state): State<Arc<AppState<S>>>,
225 Path(id): Path<String>,
226) -> Result<axum::http::StatusCode, AppError> {
227 let cancelled = state.engine.cancel_workflow(&id).await?;
228 if cancelled {
229 Ok(axum::http::StatusCode::OK)
230 } else {
231 Err(AppError::NotFound(format!(
232 "workflow {id} not found or already terminal"
233 )))
234 }
235}
236
237#[derive(Deserialize, ToSchema)]
238pub struct TerminateBody {
239 pub reason: Option<String>,
240}
241
242#[utoipa::path(
243 post, path = "/api/v1/workflows/{id}/terminate",
244 tag = "workflows",
245 params(("id" = String, Path, description = "Workflow ID")),
246 responses(
247 (status = 200, description = "Workflow terminated"),
248 (status = 404, description = "Workflow not found or already terminal"),
249 ),
250)]
251pub async fn terminate_workflow<S: WorkflowStore>(
252 State(state): State<Arc<AppState<S>>>,
253 Path(id): Path<String>,
254 Json(body): Json<Option<TerminateBody>>,
255) -> Result<axum::http::StatusCode, AppError> {
256 let reason = body.and_then(|b| b.reason);
257 let terminated = state
258 .engine
259 .terminate_workflow(&id, reason.as_deref())
260 .await?;
261 if terminated {
262 Ok(axum::http::StatusCode::OK)
263 } else {
264 Err(AppError::NotFound(format!(
265 "workflow {id} not found or already terminal"
266 )))
267 }
268}
269
270#[utoipa::path(
271 get, path = "/api/v1/workflows/{id}/children",
272 tag = "workflows",
273 params(("id" = String, Path, description = "Parent workflow ID")),
274 responses(
275 (status = 200, description = "Child workflows", body = Vec<WorkflowRecord>),
276 ),
277)]
278pub async fn list_children<S: WorkflowStore>(
279 State(state): State<Arc<AppState<S>>>,
280 Path(id): Path<String>,
281) -> Result<Json<Vec<serde_json::Value>>, AppError> {
282 let children = state.engine.list_child_workflows(&id).await?;
283 let json: Vec<serde_json::Value> = children
284 .into_iter()
285 .map(|w| serde_json::to_value(w).unwrap_or_default())
286 .collect();
287 Ok(Json(json))
288}
289
290#[derive(Deserialize, ToSchema)]
291pub struct ContinueAsNewBody {
292 pub input: Option<serde_json::Value>,
294}
295
296#[utoipa::path(
297 post, path = "/api/v1/workflows/{id}/continue-as-new",
298 tag = "workflows",
299 params(("id" = String, Path, description = "Workflow ID to continue")),
300 request_body = ContinueAsNewBody,
301 responses(
302 (status = 201, description = "New workflow run started", body = WorkflowResponse),
303 ),
304)]
305pub async fn continue_as_new<S: WorkflowStore>(
306 State(state): State<Arc<AppState<S>>>,
307 Path(id): Path<String>,
308 Json(body): Json<ContinueAsNewBody>,
309) -> Result<(axum::http::StatusCode, Json<WorkflowResponse>), AppError> {
310 let input = body.input.map(|v| v.to_string());
311 let wf = state
312 .engine
313 .continue_as_new(&id, input.as_deref())
314 .await?;
315
316 Ok((
317 axum::http::StatusCode::CREATED,
318 Json(WorkflowResponse {
319 workflow_id: wf.id,
320 run_id: wf.run_id,
321 status: wf.status,
322 }),
323 ))
324}
325
326pub enum AppError {
329 Internal(anyhow::Error),
330 NotFound(String),
331}
332
333impl From<anyhow::Error> for AppError {
334 fn from(e: anyhow::Error) -> Self {
335 Self::Internal(e)
336 }
337}
338
339impl From<serde_json::Error> for AppError {
340 fn from(e: serde_json::Error) -> Self {
341 Self::Internal(e.into())
342 }
343}
344
345impl axum::response::IntoResponse for AppError {
346 fn into_response(self) -> axum::response::Response {
347 match self {
348 Self::Internal(e) => {
349 tracing::error!("Internal error: {e}");
350 (
351 axum::http::StatusCode::INTERNAL_SERVER_ERROR,
352 Json(serde_json::json!({ "error": e.to_string() })),
353 )
354 .into_response()
355 }
356 Self::NotFound(msg) => (
357 axum::http::StatusCode::NOT_FOUND,
358 Json(serde_json::json!({ "error": format!("not found: {msg}") })),
359 )
360 .into_response(),
361 }
362 }
363}
364
365use crate::types::{WorkflowEvent, WorkflowRecord};