Skip to main content

assay_workflow/api/
workflows.rs

1use std::sync::Arc;
2
3use axum::extract::{Path, Query, State};
4use axum::routing::{get, post};
5use axum::{Json, Router};
6use serde::{Deserialize, Serialize};
7use utoipa::ToSchema;
8
9use crate::api::AppState;
10use crate::store::WorkflowStore;
11use crate::types::WorkflowStatus;
12
13pub fn router<S: WorkflowStore + 'static>() -> Router<Arc<AppState<S>>> {
14    Router::new()
15        .route("/workflows", post(start_workflow).get(list_workflows))
16        .route("/workflows/{id}", get(describe_workflow))
17        .route("/workflows/{id}/events", get(get_events))
18        .route("/workflows/{id}/signal/{name}", post(send_signal))
19        .route("/workflows/{id}/cancel", post(cancel_workflow))
20        .route("/workflows/{id}/terminate", post(terminate_workflow))
21        .route("/workflows/{id}/children", get(list_children))
22        .route("/workflows/{id}/continue-as-new", post(continue_as_new))
23}
24
25#[derive(Deserialize, ToSchema)]
26pub struct StartWorkflowRequest {
27    /// Namespace (default: "main")
28    pub namespace: Option<String>,
29    /// Workflow type name (e.g. "IngestData", "DeployService")
30    pub workflow_type: String,
31    /// Unique workflow ID (caller-provided for idempotency)
32    pub workflow_id: String,
33    /// Optional JSON input passed to the workflow
34    pub input: Option<serde_json::Value>,
35    /// Task queue to route the workflow to (default: "main")
36    #[serde(default = "default_queue")]
37    pub task_queue: String,
38}
39
40fn default_queue() -> String {
41    "main".to_string()
42}
43
44#[derive(Serialize, ToSchema)]
45pub struct WorkflowResponse {
46    pub workflow_id: String,
47    pub run_id: String,
48    pub status: String,
49}
50
51#[utoipa::path(
52    post, path = "/api/v1/workflows",
53    tag = "workflows",
54    request_body = StartWorkflowRequest,
55    responses(
56        (status = 201, description = "Workflow started", body = WorkflowResponse),
57        (status = 500, description = "Internal error"),
58    ),
59)]
60pub async fn start_workflow<S: WorkflowStore>(
61    State(state): State<Arc<AppState<S>>>,
62    Json(req): Json<StartWorkflowRequest>,
63) -> Result<(axum::http::StatusCode, Json<WorkflowResponse>), AppError> {
64    let input = req.input.map(|v| v.to_string());
65    let namespace = req.namespace.as_deref().unwrap_or("main");
66    let wf = state
67        .engine
68        .start_workflow(
69            namespace,
70            &req.workflow_type,
71            &req.workflow_id,
72            input.as_deref(),
73            &req.task_queue,
74        )
75        .await?;
76
77    Ok((
78        axum::http::StatusCode::CREATED,
79        Json(WorkflowResponse {
80            workflow_id: wf.id,
81            run_id: wf.run_id,
82            status: wf.status,
83        }),
84    ))
85}
86
87#[derive(Deserialize)]
88pub struct ListQuery {
89    #[serde(default = "default_namespace")]
90    pub namespace: String,
91    pub status: Option<String>,
92    #[serde(rename = "type")]
93    pub workflow_type: Option<String>,
94    #[serde(default = "default_limit")]
95    pub limit: i64,
96    #[serde(default)]
97    pub offset: i64,
98}
99
100fn default_namespace() -> String {
101    "main".to_string()
102}
103
104fn default_limit() -> i64 {
105    50
106}
107
108#[utoipa::path(
109    get, path = "/api/v1/workflows",
110    tag = "workflows",
111    params(
112        ("status" = Option<String>, Query, description = "Filter by status"),
113        ("type" = Option<String>, Query, description = "Filter by workflow type"),
114        ("limit" = Option<i64>, Query, description = "Max results (default 50)"),
115        ("offset" = Option<i64>, Query, description = "Pagination offset"),
116    ),
117    responses(
118        (status = 200, description = "List of workflows", body = Vec<WorkflowRecord>),
119    ),
120)]
121pub async fn list_workflows<S: WorkflowStore>(
122    State(state): State<Arc<AppState<S>>>,
123    Query(q): Query<ListQuery>,
124) -> Result<Json<Vec<serde_json::Value>>, AppError> {
125    let status = q
126        .status
127        .as_deref()
128        .and_then(|s| s.parse::<WorkflowStatus>().ok());
129
130    let workflows = state
131        .engine
132        .list_workflows(&q.namespace, status, q.workflow_type.as_deref(), q.limit, q.offset)
133        .await?;
134
135    let json: Vec<serde_json::Value> = workflows
136        .into_iter()
137        .map(|w| serde_json::to_value(w).unwrap_or_default())
138        .collect();
139
140    Ok(Json(json))
141}
142
143#[utoipa::path(
144    get, path = "/api/v1/workflows/{id}",
145    tag = "workflows",
146    params(("id" = String, Path, description = "Workflow ID")),
147    responses(
148        (status = 200, description = "Workflow details", body = WorkflowRecord),
149        (status = 404, description = "Workflow not found"),
150    ),
151)]
152pub async fn describe_workflow<S: WorkflowStore>(
153    State(state): State<Arc<AppState<S>>>,
154    Path(id): Path<String>,
155) -> Result<Json<serde_json::Value>, AppError> {
156    let wf = state
157        .engine
158        .get_workflow(&id)
159        .await?
160        .ok_or(AppError::NotFound(format!("workflow {id}")))?;
161
162    Ok(Json(serde_json::to_value(wf)?))
163}
164
165#[utoipa::path(
166    get, path = "/api/v1/workflows/{id}/events",
167    tag = "workflows",
168    params(("id" = String, Path, description = "Workflow ID")),
169    responses(
170        (status = 200, description = "Event history", body = Vec<WorkflowEvent>),
171    ),
172)]
173pub async fn get_events<S: WorkflowStore>(
174    State(state): State<Arc<AppState<S>>>,
175    Path(id): Path<String>,
176) -> Result<Json<Vec<serde_json::Value>>, AppError> {
177    let events = state.engine.get_events(&id).await?;
178    let json: Vec<serde_json::Value> = events
179        .into_iter()
180        .map(|e| serde_json::to_value(e).unwrap_or_default())
181        .collect();
182    Ok(Json(json))
183}
184
185#[derive(Deserialize, ToSchema)]
186pub struct SignalBody {
187    pub payload: Option<serde_json::Value>,
188}
189
190#[utoipa::path(
191    post, path = "/api/v1/workflows/{id}/signal/{name}",
192    tag = "workflows",
193    params(
194        ("id" = String, Path, description = "Workflow ID"),
195        ("name" = String, Path, description = "Signal name"),
196    ),
197    responses(
198        (status = 200, description = "Signal sent"),
199    ),
200)]
201pub async fn send_signal<S: WorkflowStore>(
202    State(state): State<Arc<AppState<S>>>,
203    Path((id, name)): Path<(String, String)>,
204    Json(body): Json<Option<SignalBody>>,
205) -> Result<axum::http::StatusCode, AppError> {
206    let payload = body.and_then(|b| b.payload).map(|v| v.to_string());
207    state
208        .engine
209        .send_signal(&id, &name, payload.as_deref())
210        .await?;
211    Ok(axum::http::StatusCode::OK)
212}
213
214#[utoipa::path(
215    post, path = "/api/v1/workflows/{id}/cancel",
216    tag = "workflows",
217    params(("id" = String, Path, description = "Workflow ID")),
218    responses(
219        (status = 200, description = "Workflow cancelled"),
220        (status = 404, description = "Workflow not found or already terminal"),
221    ),
222)]
223pub async fn cancel_workflow<S: WorkflowStore>(
224    State(state): State<Arc<AppState<S>>>,
225    Path(id): Path<String>,
226) -> Result<axum::http::StatusCode, AppError> {
227    let cancelled = state.engine.cancel_workflow(&id).await?;
228    if cancelled {
229        Ok(axum::http::StatusCode::OK)
230    } else {
231        Err(AppError::NotFound(format!(
232            "workflow {id} not found or already terminal"
233        )))
234    }
235}
236
237#[derive(Deserialize, ToSchema)]
238pub struct TerminateBody {
239    pub reason: Option<String>,
240}
241
242#[utoipa::path(
243    post, path = "/api/v1/workflows/{id}/terminate",
244    tag = "workflows",
245    params(("id" = String, Path, description = "Workflow ID")),
246    responses(
247        (status = 200, description = "Workflow terminated"),
248        (status = 404, description = "Workflow not found or already terminal"),
249    ),
250)]
251pub async fn terminate_workflow<S: WorkflowStore>(
252    State(state): State<Arc<AppState<S>>>,
253    Path(id): Path<String>,
254    Json(body): Json<Option<TerminateBody>>,
255) -> Result<axum::http::StatusCode, AppError> {
256    let reason = body.and_then(|b| b.reason);
257    let terminated = state
258        .engine
259        .terminate_workflow(&id, reason.as_deref())
260        .await?;
261    if terminated {
262        Ok(axum::http::StatusCode::OK)
263    } else {
264        Err(AppError::NotFound(format!(
265            "workflow {id} not found or already terminal"
266        )))
267    }
268}
269
270#[utoipa::path(
271    get, path = "/api/v1/workflows/{id}/children",
272    tag = "workflows",
273    params(("id" = String, Path, description = "Parent workflow ID")),
274    responses(
275        (status = 200, description = "Child workflows", body = Vec<WorkflowRecord>),
276    ),
277)]
278pub async fn list_children<S: WorkflowStore>(
279    State(state): State<Arc<AppState<S>>>,
280    Path(id): Path<String>,
281) -> Result<Json<Vec<serde_json::Value>>, AppError> {
282    let children = state.engine.list_child_workflows(&id).await?;
283    let json: Vec<serde_json::Value> = children
284        .into_iter()
285        .map(|w| serde_json::to_value(w).unwrap_or_default())
286        .collect();
287    Ok(Json(json))
288}
289
290#[derive(Deserialize, ToSchema)]
291pub struct ContinueAsNewBody {
292    /// New input for the continued workflow run
293    pub input: Option<serde_json::Value>,
294}
295
296#[utoipa::path(
297    post, path = "/api/v1/workflows/{id}/continue-as-new",
298    tag = "workflows",
299    params(("id" = String, Path, description = "Workflow ID to continue")),
300    request_body = ContinueAsNewBody,
301    responses(
302        (status = 201, description = "New workflow run started", body = WorkflowResponse),
303    ),
304)]
305pub async fn continue_as_new<S: WorkflowStore>(
306    State(state): State<Arc<AppState<S>>>,
307    Path(id): Path<String>,
308    Json(body): Json<ContinueAsNewBody>,
309) -> Result<(axum::http::StatusCode, Json<WorkflowResponse>), AppError> {
310    let input = body.input.map(|v| v.to_string());
311    let wf = state
312        .engine
313        .continue_as_new(&id, input.as_deref())
314        .await?;
315
316    Ok((
317        axum::http::StatusCode::CREATED,
318        Json(WorkflowResponse {
319            workflow_id: wf.id,
320            run_id: wf.run_id,
321            status: wf.status,
322        }),
323    ))
324}
325
326// ── Error type ──────────────────────────────────────────────
327
328pub enum AppError {
329    Internal(anyhow::Error),
330    NotFound(String),
331}
332
333impl From<anyhow::Error> for AppError {
334    fn from(e: anyhow::Error) -> Self {
335        Self::Internal(e)
336    }
337}
338
339impl From<serde_json::Error> for AppError {
340    fn from(e: serde_json::Error) -> Self {
341        Self::Internal(e.into())
342    }
343}
344
345impl axum::response::IntoResponse for AppError {
346    fn into_response(self) -> axum::response::Response {
347        match self {
348            Self::Internal(e) => {
349                tracing::error!("Internal error: {e}");
350                (
351                    axum::http::StatusCode::INTERNAL_SERVER_ERROR,
352                    Json(serde_json::json!({ "error": e.to_string() })),
353                )
354                    .into_response()
355            }
356            Self::NotFound(msg) => (
357                axum::http::StatusCode::NOT_FOUND,
358                Json(serde_json::json!({ "error": format!("not found: {msg}") })),
359            )
360                .into_response(),
361        }
362    }
363}
364
365// Type alias for utoipa references (the actual type is WorkflowRecord from types.rs)
366use crate::types::{WorkflowEvent, WorkflowRecord};