Skip to main content

assay_workflow/api/
workflows.rs

1use std::sync::Arc;
2
3use axum::extract::{Path, Query, State};
4use axum::routing::{get, post};
5use axum::{Json, Router};
6use serde::{Deserialize, Serialize};
7use utoipa::ToSchema;
8
9use crate::api::AppState;
10use crate::store::WorkflowStore;
11use crate::types::WorkflowStatus;
12
13pub fn router<S: WorkflowStore + 'static>() -> Router<Arc<AppState<S>>> {
14    Router::new()
15        .route("/workflows", post(start_workflow).get(list_workflows))
16        .route("/workflows/{id}", get(describe_workflow))
17        .route("/workflows/{id}/events", get(get_events))
18        .route("/workflows/{id}/signal/{name}", post(send_signal))
19        .route("/workflows/{id}/cancel", post(cancel_workflow))
20        .route("/workflows/{id}/terminate", post(terminate_workflow))
21        .route("/workflows/{id}/children", get(list_children))
22        .route("/workflows/{id}/continue-as-new", post(continue_as_new))
23        .route("/workflows/{id}/state", get(get_workflow_state))
24        .route("/workflows/{id}/state/{name}", get(get_workflow_state_by_name))
25}
26
27#[derive(Deserialize, ToSchema)]
28pub struct StartWorkflowRequest {
29    /// Namespace (default: "main")
30    pub namespace: Option<String>,
31    /// Workflow type name (e.g. "IngestData", "DeployService")
32    pub workflow_type: String,
33    /// Unique workflow ID (caller-provided for idempotency)
34    pub workflow_id: String,
35    /// Optional JSON input passed to the workflow
36    pub input: Option<serde_json::Value>,
37    /// Task queue to route the workflow to (default: "main")
38    #[serde(default = "default_queue")]
39    pub task_queue: String,
40    /// Optional indexed metadata (JSON object). Used by list-filtering;
41    /// workflows can also update it at runtime via
42    /// `ctx:upsert_search_attributes(...)`.
43    pub search_attributes: Option<serde_json::Value>,
44}
45
46fn default_queue() -> String {
47    "main".to_string()
48}
49
50#[derive(Serialize, ToSchema)]
51pub struct WorkflowResponse {
52    pub workflow_id: String,
53    pub run_id: String,
54    pub status: String,
55}
56
57#[utoipa::path(
58    post, path = "/api/v1/workflows",
59    tag = "workflows",
60    request_body = StartWorkflowRequest,
61    responses(
62        (status = 201, description = "Workflow started", body = WorkflowResponse),
63        (status = 500, description = "Internal error"),
64    ),
65)]
66pub async fn start_workflow<S: WorkflowStore>(
67    State(state): State<Arc<AppState<S>>>,
68    Json(req): Json<StartWorkflowRequest>,
69) -> Result<(axum::http::StatusCode, Json<WorkflowResponse>), AppError> {
70    let input = req.input.map(|v| v.to_string());
71    let namespace = req.namespace.as_deref().unwrap_or("main");
72    let search_attributes = req.search_attributes.map(|v| v.to_string());
73    let wf = state
74        .engine
75        .start_workflow(
76            namespace,
77            &req.workflow_type,
78            &req.workflow_id,
79            input.as_deref(),
80            &req.task_queue,
81            search_attributes.as_deref(),
82        )
83        .await?;
84
85    Ok((
86        axum::http::StatusCode::CREATED,
87        Json(WorkflowResponse {
88            workflow_id: wf.id,
89            run_id: wf.run_id,
90            status: wf.status,
91        }),
92    ))
93}
94
95#[derive(Deserialize)]
96pub struct ListQuery {
97    #[serde(default = "default_namespace")]
98    pub namespace: String,
99    pub status: Option<String>,
100    #[serde(rename = "type")]
101    pub workflow_type: Option<String>,
102    /// URL-encoded JSON object; matches workflows whose `search_attributes`
103    /// contain every listed key at the given value. e.g.
104    /// `?search_attrs=%7B%22env%22%3A%22prod%22%7D` for `{"env":"prod"}`.
105    pub search_attrs: Option<String>,
106    #[serde(default = "default_limit")]
107    pub limit: i64,
108    #[serde(default)]
109    pub offset: i64,
110}
111
112fn default_namespace() -> String {
113    "main".to_string()
114}
115
116fn default_limit() -> i64 {
117    50
118}
119
120#[utoipa::path(
121    get, path = "/api/v1/workflows",
122    tag = "workflows",
123    params(
124        ("status" = Option<String>, Query, description = "Filter by status"),
125        ("type" = Option<String>, Query, description = "Filter by workflow type"),
126        ("limit" = Option<i64>, Query, description = "Max results (default 50)"),
127        ("offset" = Option<i64>, Query, description = "Pagination offset"),
128    ),
129    responses(
130        (status = 200, description = "List of workflows", body = Vec<WorkflowRecord>),
131    ),
132)]
133pub async fn list_workflows<S: WorkflowStore>(
134    State(state): State<Arc<AppState<S>>>,
135    Query(q): Query<ListQuery>,
136) -> Result<Json<Vec<serde_json::Value>>, AppError> {
137    let status = q
138        .status
139        .as_deref()
140        .and_then(|s| s.parse::<WorkflowStatus>().ok());
141
142    let workflows = state
143        .engine
144        .list_workflows(
145            &q.namespace,
146            status,
147            q.workflow_type.as_deref(),
148            q.search_attrs.as_deref(),
149            q.limit,
150            q.offset,
151        )
152        .await?;
153
154    let json: Vec<serde_json::Value> = workflows
155        .into_iter()
156        .map(|w| serde_json::to_value(w).unwrap_or_default())
157        .collect();
158
159    Ok(Json(json))
160}
161
162#[utoipa::path(
163    get, path = "/api/v1/workflows/{id}",
164    tag = "workflows",
165    params(("id" = String, Path, description = "Workflow ID")),
166    responses(
167        (status = 200, description = "Workflow details", body = WorkflowRecord),
168        (status = 404, description = "Workflow not found"),
169    ),
170)]
171pub async fn describe_workflow<S: WorkflowStore>(
172    State(state): State<Arc<AppState<S>>>,
173    Path(id): Path<String>,
174) -> Result<Json<serde_json::Value>, AppError> {
175    let wf = state
176        .engine
177        .get_workflow(&id)
178        .await?
179        .ok_or(AppError::NotFound(format!("workflow {id}")))?;
180
181    Ok(Json(serde_json::to_value(wf)?))
182}
183
184#[utoipa::path(
185    get, path = "/api/v1/workflows/{id}/events",
186    tag = "workflows",
187    params(("id" = String, Path, description = "Workflow ID")),
188    responses(
189        (status = 200, description = "Event history", body = Vec<WorkflowEvent>),
190    ),
191)]
192pub async fn get_events<S: WorkflowStore>(
193    State(state): State<Arc<AppState<S>>>,
194    Path(id): Path<String>,
195) -> Result<Json<Vec<serde_json::Value>>, AppError> {
196    let events = state.engine.get_events(&id).await?;
197    let json: Vec<serde_json::Value> = events
198        .into_iter()
199        .map(|e| serde_json::to_value(e).unwrap_or_default())
200        .collect();
201    Ok(Json(json))
202}
203
204#[derive(Deserialize, ToSchema)]
205pub struct SignalBody {
206    pub payload: Option<serde_json::Value>,
207}
208
209#[utoipa::path(
210    post, path = "/api/v1/workflows/{id}/signal/{name}",
211    tag = "workflows",
212    params(
213        ("id" = String, Path, description = "Workflow ID"),
214        ("name" = String, Path, description = "Signal name"),
215    ),
216    responses(
217        (status = 200, description = "Signal sent"),
218    ),
219)]
220pub async fn send_signal<S: WorkflowStore>(
221    State(state): State<Arc<AppState<S>>>,
222    Path((id, name)): Path<(String, String)>,
223    Json(body): Json<Option<SignalBody>>,
224) -> Result<axum::http::StatusCode, AppError> {
225    let payload = body.and_then(|b| b.payload).map(|v| v.to_string());
226    state
227        .engine
228        .send_signal(&id, &name, payload.as_deref())
229        .await?;
230    Ok(axum::http::StatusCode::OK)
231}
232
233#[utoipa::path(
234    post, path = "/api/v1/workflows/{id}/cancel",
235    tag = "workflows",
236    params(("id" = String, Path, description = "Workflow ID")),
237    responses(
238        (status = 200, description = "Workflow cancelled"),
239        (status = 404, description = "Workflow not found or already terminal"),
240    ),
241)]
242pub async fn cancel_workflow<S: WorkflowStore>(
243    State(state): State<Arc<AppState<S>>>,
244    Path(id): Path<String>,
245) -> Result<axum::http::StatusCode, AppError> {
246    let cancelled = state.engine.cancel_workflow(&id).await?;
247    if cancelled {
248        Ok(axum::http::StatusCode::OK)
249    } else {
250        Err(AppError::NotFound(format!(
251            "workflow {id} not found or already terminal"
252        )))
253    }
254}
255
256#[derive(Deserialize, ToSchema)]
257pub struct TerminateBody {
258    pub reason: Option<String>,
259}
260
261#[utoipa::path(
262    post, path = "/api/v1/workflows/{id}/terminate",
263    tag = "workflows",
264    params(("id" = String, Path, description = "Workflow ID")),
265    responses(
266        (status = 200, description = "Workflow terminated"),
267        (status = 404, description = "Workflow not found or already terminal"),
268    ),
269)]
270pub async fn terminate_workflow<S: WorkflowStore>(
271    State(state): State<Arc<AppState<S>>>,
272    Path(id): Path<String>,
273    Json(body): Json<Option<TerminateBody>>,
274) -> Result<axum::http::StatusCode, AppError> {
275    let reason = body.and_then(|b| b.reason);
276    let terminated = state
277        .engine
278        .terminate_workflow(&id, reason.as_deref())
279        .await?;
280    if terminated {
281        Ok(axum::http::StatusCode::OK)
282    } else {
283        Err(AppError::NotFound(format!(
284            "workflow {id} not found or already terminal"
285        )))
286    }
287}
288
289#[utoipa::path(
290    get, path = "/api/v1/workflows/{id}/children",
291    tag = "workflows",
292    params(("id" = String, Path, description = "Parent workflow ID")),
293    responses(
294        (status = 200, description = "Child workflows", body = Vec<WorkflowRecord>),
295    ),
296)]
297pub async fn list_children<S: WorkflowStore>(
298    State(state): State<Arc<AppState<S>>>,
299    Path(id): Path<String>,
300) -> Result<Json<Vec<serde_json::Value>>, AppError> {
301    let children = state.engine.list_child_workflows(&id).await?;
302    let json: Vec<serde_json::Value> = children
303        .into_iter()
304        .map(|w| serde_json::to_value(w).unwrap_or_default())
305        .collect();
306    Ok(Json(json))
307}
308
309#[derive(Deserialize, ToSchema)]
310pub struct ContinueAsNewBody {
311    /// New input for the continued workflow run
312    pub input: Option<serde_json::Value>,
313}
314
315#[utoipa::path(
316    post, path = "/api/v1/workflows/{id}/continue-as-new",
317    tag = "workflows",
318    params(("id" = String, Path, description = "Workflow ID to continue")),
319    request_body = ContinueAsNewBody,
320    responses(
321        (status = 201, description = "New workflow run started", body = WorkflowResponse),
322    ),
323)]
324pub async fn continue_as_new<S: WorkflowStore>(
325    State(state): State<Arc<AppState<S>>>,
326    Path(id): Path<String>,
327    Json(body): Json<ContinueAsNewBody>,
328) -> Result<(axum::http::StatusCode, Json<WorkflowResponse>), AppError> {
329    let input = body.input.map(|v| v.to_string());
330    let wf = state
331        .engine
332        .continue_as_new(&id, input.as_deref())
333        .await?;
334
335    Ok((
336        axum::http::StatusCode::CREATED,
337        Json(WorkflowResponse {
338            workflow_id: wf.id,
339            run_id: wf.run_id,
340            status: wf.status,
341        }),
342    ))
343}
344
345// ── Live state (register_query) ─────────────────────────────
346
347/// Read the latest snapshot of a workflow's query-handler state.
348///
349/// Populated by workflow code that calls `ctx:register_query(name, fn)` —
350/// each worker replay re-evaluates the registered handlers and persists the
351/// combined result. Returns 404 if no workflow run has written a snapshot
352/// yet (either the workflow hasn't registered any queries, or the first
353/// replay hasn't completed).
354#[utoipa::path(
355    get, path = "/api/v1/workflows/{id}/state",
356    tag = "workflows",
357    params(("id" = String, Path, description = "Workflow ID")),
358    responses(
359        (status = 200, description = "Latest state snapshot"),
360        (status = 404, description = "No snapshot recorded for this workflow"),
361    ),
362)]
363pub async fn get_workflow_state<S: WorkflowStore>(
364    State(state): State<Arc<AppState<S>>>,
365    Path(id): Path<String>,
366) -> Result<Json<serde_json::Value>, AppError> {
367    let snapshot = state
368        .engine
369        .get_latest_snapshot(&id)
370        .await?
371        .ok_or_else(|| AppError::NotFound(format!("state for workflow {id}")))?;
372
373    let parsed: serde_json::Value =
374        serde_json::from_str(&snapshot.state_json).unwrap_or(serde_json::Value::Null);
375
376    Ok(Json(serde_json::json!({
377        "state": parsed,
378        "event_seq": snapshot.event_seq,
379        "created_at": snapshot.created_at,
380    })))
381}
382
383/// Read a single named query result from a workflow's latest snapshot.
384///
385/// Returns the value under the given key in the latest snapshot's state
386/// object, or 404 if no snapshot exists or the key is absent.
387#[utoipa::path(
388    get, path = "/api/v1/workflows/{id}/state/{name}",
389    tag = "workflows",
390    params(
391        ("id" = String, Path, description = "Workflow ID"),
392        ("name" = String, Path, description = "Query handler name"),
393    ),
394    responses(
395        (status = 200, description = "Query value"),
396        (status = 404, description = "No snapshot or key not present"),
397    ),
398)]
399pub async fn get_workflow_state_by_name<S: WorkflowStore>(
400    State(state): State<Arc<AppState<S>>>,
401    Path((id, name)): Path<(String, String)>,
402) -> Result<Json<serde_json::Value>, AppError> {
403    let snapshot = state
404        .engine
405        .get_latest_snapshot(&id)
406        .await?
407        .ok_or_else(|| AppError::NotFound(format!("state for workflow {id}")))?;
408
409    let parsed: serde_json::Value =
410        serde_json::from_str(&snapshot.state_json).unwrap_or(serde_json::Value::Null);
411
412    let value = parsed
413        .get(&name)
414        .cloned()
415        .ok_or_else(|| AppError::NotFound(format!("query '{name}' for workflow {id}")))?;
416
417    Ok(Json(serde_json::json!({
418        "value": value,
419        "event_seq": snapshot.event_seq,
420        "created_at": snapshot.created_at,
421    })))
422}
423
424// ── Error type ──────────────────────────────────────────────
425
426pub enum AppError {
427    Internal(anyhow::Error),
428    NotFound(String),
429}
430
431impl From<anyhow::Error> for AppError {
432    fn from(e: anyhow::Error) -> Self {
433        Self::Internal(e)
434    }
435}
436
437impl From<serde_json::Error> for AppError {
438    fn from(e: serde_json::Error) -> Self {
439        Self::Internal(e.into())
440    }
441}
442
443impl axum::response::IntoResponse for AppError {
444    fn into_response(self) -> axum::response::Response {
445        match self {
446            Self::Internal(e) => {
447                tracing::error!("Internal error: {e}");
448                (
449                    axum::http::StatusCode::INTERNAL_SERVER_ERROR,
450                    Json(serde_json::json!({ "error": e.to_string() })),
451                )
452                    .into_response()
453            }
454            Self::NotFound(msg) => (
455                axum::http::StatusCode::NOT_FOUND,
456                Json(serde_json::json!({ "error": format!("not found: {msg}") })),
457            )
458                .into_response(),
459        }
460    }
461}
462
463// Type alias for utoipa references (the actual type is WorkflowRecord from types.rs)
464use crate::types::{WorkflowEvent, WorkflowRecord};