1use std::sync::Arc;
2
3use axum::extract::{Path, Query, State};
4use axum::routing::{get, post};
5use axum::{Json, Router};
6use serde::{Deserialize, Serialize};
7use utoipa::ToSchema;
8
9use crate::api::AppState;
10use crate::store::WorkflowStore;
11use crate::types::WorkflowStatus;
12
13pub fn router<S: WorkflowStore + 'static>() -> Router<Arc<AppState<S>>> {
14 Router::new()
15 .route("/workflows", post(start_workflow).get(list_workflows))
16 .route("/workflows/{id}", get(describe_workflow))
17 .route("/workflows/{id}/events", get(get_events))
18 .route("/workflows/{id}/signal/{name}", post(send_signal))
19 .route("/workflows/{id}/cancel", post(cancel_workflow))
20 .route("/workflows/{id}/terminate", post(terminate_workflow))
21 .route("/workflows/{id}/children", get(list_children))
22 .route("/workflows/{id}/continue-as-new", post(continue_as_new))
23 .route("/workflows/{id}/state", get(get_workflow_state))
24 .route("/workflows/{id}/state/{name}", get(get_workflow_state_by_name))
25}
26
27#[derive(Deserialize, ToSchema)]
28pub struct StartWorkflowRequest {
29 pub namespace: Option<String>,
31 pub workflow_type: String,
33 pub workflow_id: String,
35 pub input: Option<serde_json::Value>,
37 #[serde(default = "default_queue")]
39 pub task_queue: String,
40 pub search_attributes: Option<serde_json::Value>,
44}
45
46fn default_queue() -> String {
47 "main".to_string()
48}
49
50#[derive(Serialize, ToSchema)]
51pub struct WorkflowResponse {
52 pub workflow_id: String,
53 pub run_id: String,
54 pub status: String,
55}
56
57#[utoipa::path(
58 post, path = "/api/v1/workflows",
59 tag = "workflows",
60 request_body = StartWorkflowRequest,
61 responses(
62 (status = 201, description = "Workflow started", body = WorkflowResponse),
63 (status = 500, description = "Internal error"),
64 ),
65)]
66pub async fn start_workflow<S: WorkflowStore>(
67 State(state): State<Arc<AppState<S>>>,
68 Json(req): Json<StartWorkflowRequest>,
69) -> Result<(axum::http::StatusCode, Json<WorkflowResponse>), AppError> {
70 let input = req.input.map(|v| v.to_string());
71 let namespace = req.namespace.as_deref().unwrap_or("main");
72 let search_attributes = req.search_attributes.map(|v| v.to_string());
73 let wf = state
74 .engine
75 .start_workflow(
76 namespace,
77 &req.workflow_type,
78 &req.workflow_id,
79 input.as_deref(),
80 &req.task_queue,
81 search_attributes.as_deref(),
82 )
83 .await?;
84
85 Ok((
86 axum::http::StatusCode::CREATED,
87 Json(WorkflowResponse {
88 workflow_id: wf.id,
89 run_id: wf.run_id,
90 status: wf.status,
91 }),
92 ))
93}
94
95#[derive(Deserialize)]
96pub struct ListQuery {
97 #[serde(default = "default_namespace")]
98 pub namespace: String,
99 pub status: Option<String>,
100 #[serde(rename = "type")]
101 pub workflow_type: Option<String>,
102 pub search_attrs: Option<String>,
106 #[serde(default = "default_limit")]
107 pub limit: i64,
108 #[serde(default)]
109 pub offset: i64,
110}
111
112fn default_namespace() -> String {
113 "main".to_string()
114}
115
116fn default_limit() -> i64 {
117 50
118}
119
120#[utoipa::path(
121 get, path = "/api/v1/workflows",
122 tag = "workflows",
123 params(
124 ("status" = Option<String>, Query, description = "Filter by status"),
125 ("type" = Option<String>, Query, description = "Filter by workflow type"),
126 ("limit" = Option<i64>, Query, description = "Max results (default 50)"),
127 ("offset" = Option<i64>, Query, description = "Pagination offset"),
128 ),
129 responses(
130 (status = 200, description = "List of workflows", body = Vec<WorkflowRecord>),
131 ),
132)]
133pub async fn list_workflows<S: WorkflowStore>(
134 State(state): State<Arc<AppState<S>>>,
135 Query(q): Query<ListQuery>,
136) -> Result<Json<Vec<serde_json::Value>>, AppError> {
137 let status = q
138 .status
139 .as_deref()
140 .and_then(|s| s.parse::<WorkflowStatus>().ok());
141
142 let workflows = state
143 .engine
144 .list_workflows(
145 &q.namespace,
146 status,
147 q.workflow_type.as_deref(),
148 q.search_attrs.as_deref(),
149 q.limit,
150 q.offset,
151 )
152 .await?;
153
154 let json: Vec<serde_json::Value> = workflows
155 .into_iter()
156 .map(|w| serde_json::to_value(w).unwrap_or_default())
157 .collect();
158
159 Ok(Json(json))
160}
161
162#[utoipa::path(
163 get, path = "/api/v1/workflows/{id}",
164 tag = "workflows",
165 params(("id" = String, Path, description = "Workflow ID")),
166 responses(
167 (status = 200, description = "Workflow details", body = WorkflowRecord),
168 (status = 404, description = "Workflow not found"),
169 ),
170)]
171pub async fn describe_workflow<S: WorkflowStore>(
172 State(state): State<Arc<AppState<S>>>,
173 Path(id): Path<String>,
174) -> Result<Json<serde_json::Value>, AppError> {
175 let wf = state
176 .engine
177 .get_workflow(&id)
178 .await?
179 .ok_or(AppError::NotFound(format!("workflow {id}")))?;
180
181 Ok(Json(serde_json::to_value(wf)?))
182}
183
184#[utoipa::path(
185 get, path = "/api/v1/workflows/{id}/events",
186 tag = "workflows",
187 params(("id" = String, Path, description = "Workflow ID")),
188 responses(
189 (status = 200, description = "Event history", body = Vec<WorkflowEvent>),
190 ),
191)]
192pub async fn get_events<S: WorkflowStore>(
193 State(state): State<Arc<AppState<S>>>,
194 Path(id): Path<String>,
195) -> Result<Json<Vec<serde_json::Value>>, AppError> {
196 let events = state.engine.get_events(&id).await?;
197 let json: Vec<serde_json::Value> = events
198 .into_iter()
199 .map(|e| serde_json::to_value(e).unwrap_or_default())
200 .collect();
201 Ok(Json(json))
202}
203
204#[derive(Deserialize, ToSchema)]
205pub struct SignalBody {
206 pub payload: Option<serde_json::Value>,
207}
208
209#[utoipa::path(
210 post, path = "/api/v1/workflows/{id}/signal/{name}",
211 tag = "workflows",
212 params(
213 ("id" = String, Path, description = "Workflow ID"),
214 ("name" = String, Path, description = "Signal name"),
215 ),
216 responses(
217 (status = 200, description = "Signal sent"),
218 ),
219)]
220pub async fn send_signal<S: WorkflowStore>(
221 State(state): State<Arc<AppState<S>>>,
222 Path((id, name)): Path<(String, String)>,
223 Json(body): Json<Option<SignalBody>>,
224) -> Result<axum::http::StatusCode, AppError> {
225 let payload = body.and_then(|b| b.payload).map(|v| v.to_string());
226 state
227 .engine
228 .send_signal(&id, &name, payload.as_deref())
229 .await?;
230 Ok(axum::http::StatusCode::OK)
231}
232
233#[utoipa::path(
234 post, path = "/api/v1/workflows/{id}/cancel",
235 tag = "workflows",
236 params(("id" = String, Path, description = "Workflow ID")),
237 responses(
238 (status = 200, description = "Workflow cancelled"),
239 (status = 404, description = "Workflow not found or already terminal"),
240 ),
241)]
242pub async fn cancel_workflow<S: WorkflowStore>(
243 State(state): State<Arc<AppState<S>>>,
244 Path(id): Path<String>,
245) -> Result<axum::http::StatusCode, AppError> {
246 let cancelled = state.engine.cancel_workflow(&id).await?;
247 if cancelled {
248 Ok(axum::http::StatusCode::OK)
249 } else {
250 Err(AppError::NotFound(format!(
251 "workflow {id} not found or already terminal"
252 )))
253 }
254}
255
256#[derive(Deserialize, ToSchema)]
257pub struct TerminateBody {
258 pub reason: Option<String>,
259}
260
261#[utoipa::path(
262 post, path = "/api/v1/workflows/{id}/terminate",
263 tag = "workflows",
264 params(("id" = String, Path, description = "Workflow ID")),
265 responses(
266 (status = 200, description = "Workflow terminated"),
267 (status = 404, description = "Workflow not found or already terminal"),
268 ),
269)]
270pub async fn terminate_workflow<S: WorkflowStore>(
271 State(state): State<Arc<AppState<S>>>,
272 Path(id): Path<String>,
273 Json(body): Json<Option<TerminateBody>>,
274) -> Result<axum::http::StatusCode, AppError> {
275 let reason = body.and_then(|b| b.reason);
276 let terminated = state
277 .engine
278 .terminate_workflow(&id, reason.as_deref())
279 .await?;
280 if terminated {
281 Ok(axum::http::StatusCode::OK)
282 } else {
283 Err(AppError::NotFound(format!(
284 "workflow {id} not found or already terminal"
285 )))
286 }
287}
288
289#[utoipa::path(
290 get, path = "/api/v1/workflows/{id}/children",
291 tag = "workflows",
292 params(("id" = String, Path, description = "Parent workflow ID")),
293 responses(
294 (status = 200, description = "Child workflows", body = Vec<WorkflowRecord>),
295 ),
296)]
297pub async fn list_children<S: WorkflowStore>(
298 State(state): State<Arc<AppState<S>>>,
299 Path(id): Path<String>,
300) -> Result<Json<Vec<serde_json::Value>>, AppError> {
301 let children = state.engine.list_child_workflows(&id).await?;
302 let json: Vec<serde_json::Value> = children
303 .into_iter()
304 .map(|w| serde_json::to_value(w).unwrap_or_default())
305 .collect();
306 Ok(Json(json))
307}
308
309#[derive(Deserialize, ToSchema)]
310pub struct ContinueAsNewBody {
311 pub input: Option<serde_json::Value>,
313}
314
315#[utoipa::path(
316 post, path = "/api/v1/workflows/{id}/continue-as-new",
317 tag = "workflows",
318 params(("id" = String, Path, description = "Workflow ID to continue")),
319 request_body = ContinueAsNewBody,
320 responses(
321 (status = 201, description = "New workflow run started", body = WorkflowResponse),
322 ),
323)]
324pub async fn continue_as_new<S: WorkflowStore>(
325 State(state): State<Arc<AppState<S>>>,
326 Path(id): Path<String>,
327 Json(body): Json<ContinueAsNewBody>,
328) -> Result<(axum::http::StatusCode, Json<WorkflowResponse>), AppError> {
329 let input = body.input.map(|v| v.to_string());
330 let wf = state
331 .engine
332 .continue_as_new(&id, input.as_deref())
333 .await?;
334
335 Ok((
336 axum::http::StatusCode::CREATED,
337 Json(WorkflowResponse {
338 workflow_id: wf.id,
339 run_id: wf.run_id,
340 status: wf.status,
341 }),
342 ))
343}
344
345#[utoipa::path(
355 get, path = "/api/v1/workflows/{id}/state",
356 tag = "workflows",
357 params(("id" = String, Path, description = "Workflow ID")),
358 responses(
359 (status = 200, description = "Latest state snapshot"),
360 (status = 404, description = "No snapshot recorded for this workflow"),
361 ),
362)]
363pub async fn get_workflow_state<S: WorkflowStore>(
364 State(state): State<Arc<AppState<S>>>,
365 Path(id): Path<String>,
366) -> Result<Json<serde_json::Value>, AppError> {
367 let snapshot = state
368 .engine
369 .get_latest_snapshot(&id)
370 .await?
371 .ok_or_else(|| AppError::NotFound(format!("state for workflow {id}")))?;
372
373 let parsed: serde_json::Value =
374 serde_json::from_str(&snapshot.state_json).unwrap_or(serde_json::Value::Null);
375
376 Ok(Json(serde_json::json!({
377 "state": parsed,
378 "event_seq": snapshot.event_seq,
379 "created_at": snapshot.created_at,
380 })))
381}
382
383#[utoipa::path(
388 get, path = "/api/v1/workflows/{id}/state/{name}",
389 tag = "workflows",
390 params(
391 ("id" = String, Path, description = "Workflow ID"),
392 ("name" = String, Path, description = "Query handler name"),
393 ),
394 responses(
395 (status = 200, description = "Query value"),
396 (status = 404, description = "No snapshot or key not present"),
397 ),
398)]
399pub async fn get_workflow_state_by_name<S: WorkflowStore>(
400 State(state): State<Arc<AppState<S>>>,
401 Path((id, name)): Path<(String, String)>,
402) -> Result<Json<serde_json::Value>, AppError> {
403 let snapshot = state
404 .engine
405 .get_latest_snapshot(&id)
406 .await?
407 .ok_or_else(|| AppError::NotFound(format!("state for workflow {id}")))?;
408
409 let parsed: serde_json::Value =
410 serde_json::from_str(&snapshot.state_json).unwrap_or(serde_json::Value::Null);
411
412 let value = parsed
413 .get(&name)
414 .cloned()
415 .ok_or_else(|| AppError::NotFound(format!("query '{name}' for workflow {id}")))?;
416
417 Ok(Json(serde_json::json!({
418 "value": value,
419 "event_seq": snapshot.event_seq,
420 "created_at": snapshot.created_at,
421 })))
422}
423
424pub enum AppError {
427 Internal(anyhow::Error),
428 NotFound(String),
429}
430
431impl From<anyhow::Error> for AppError {
432 fn from(e: anyhow::Error) -> Self {
433 Self::Internal(e)
434 }
435}
436
437impl From<serde_json::Error> for AppError {
438 fn from(e: serde_json::Error) -> Self {
439 Self::Internal(e.into())
440 }
441}
442
443impl axum::response::IntoResponse for AppError {
444 fn into_response(self) -> axum::response::Response {
445 match self {
446 Self::Internal(e) => {
447 tracing::error!("Internal error: {e}");
448 (
449 axum::http::StatusCode::INTERNAL_SERVER_ERROR,
450 Json(serde_json::json!({ "error": e.to_string() })),
451 )
452 .into_response()
453 }
454 Self::NotFound(msg) => (
455 axum::http::StatusCode::NOT_FOUND,
456 Json(serde_json::json!({ "error": format!("not found: {msg}") })),
457 )
458 .into_response(),
459 }
460 }
461}
462
463use crate::types::{WorkflowEvent, WorkflowRecord};