Skip to main content

assay_workflow/api/
workflow_tasks.rs

1//! Workflow-task dispatch endpoints (Phase 9).
2//!
3//! A "workflow task" represents "this workflow has new events that need a
4//! worker to run the workflow handler against." It's distinct from an
5//! "activity task" (which runs concrete activity code). The dispatch loop:
6//!
7//! 1. The engine sets `needs_dispatch=true` on a workflow when something
8//!    workflow-visible happens (started, activity completed, timer fired,
9//!    signal arrived).
10//! 2. A worker calls `POST /workflow-tasks/poll` to claim the next
11//!    dispatchable workflow on its queue. Response carries the workflow
12//!    id, type, input, and full event history for replay.
13//! 3. The worker invokes the handler in a coroutine that yields commands
14//!    (ScheduleActivity, CompleteWorkflow, FailWorkflow, etc.) instead of
15//!    making side effects directly.
16//! 4. The worker `POST /workflow-tasks/:id/commands` to submit the batch.
17//!    The engine processes each command transactionally and releases the
18//!    worker's claim on the workflow task.
19
20use std::sync::Arc;
21
22use axum::extract::{Path, State};
23use axum::routing::post;
24use axum::{Json, Router};
25use serde::Deserialize;
26use utoipa::ToSchema;
27
28use crate::api::workflows::AppError;
29use crate::api::AppState;
30use crate::store::WorkflowStore;
31
32pub fn router<S: WorkflowStore + 'static>() -> Router<Arc<AppState<S>>> {
33    Router::new()
34        .route("/workflow-tasks/poll", post(poll_workflow_task))
35        .route("/workflow-tasks/{id}/commands", post(submit_commands))
36}
37
38#[derive(Deserialize, ToSchema)]
39pub struct PollWorkflowTaskRequest {
40    pub queue: String,
41    pub worker_id: String,
42}
43
44/// Claim a dispatchable workflow on the requested queue. Response is `null`
45/// when nothing is available (worker should sleep + retry).
46#[utoipa::path(
47    post, path = "/api/v1/workflow-tasks/poll",
48    tag = "workflow-tasks",
49    request_body = PollWorkflowTaskRequest,
50    responses(
51        (status = 200, description = "Workflow task or null"),
52    ),
53)]
54pub async fn poll_workflow_task<S: WorkflowStore>(
55    State(state): State<Arc<AppState<S>>>,
56    Json(req): Json<PollWorkflowTaskRequest>,
57) -> Result<Json<serde_json::Value>, AppError> {
58    match state
59        .engine
60        .claim_workflow_task(&req.queue, &req.worker_id)
61        .await?
62    {
63        Some((wf, history)) => Ok(Json(serde_json::json!({
64            "workflow_id": wf.id,
65            "namespace": wf.namespace,
66            "workflow_type": wf.workflow_type,
67            "task_queue": wf.task_queue,
68            "input": wf.input.as_deref().and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
69            "history": history.iter().map(|e| serde_json::json!({
70                "seq": e.seq,
71                "event_type": e.event_type,
72                "payload": e.payload.as_deref().and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
73                "timestamp": e.timestamp,
74            })).collect::<Vec<_>>(),
75        }))),
76        None => Ok(Json(serde_json::Value::Null)),
77    }
78}
79
80#[derive(Deserialize, ToSchema)]
81pub struct SubmitCommandsRequest {
82    pub worker_id: String,
83    pub commands: Vec<serde_json::Value>,
84}
85
86/// Submit a batch of commands a worker produced from running the workflow
87/// handler. Each command is processed transactionally (ScheduleActivity
88/// inserts a row + appends ActivityScheduled, CompleteWorkflow flips the
89/// status + appends WorkflowCompleted, etc.) and the worker's claim is
90/// released on success.
91#[utoipa::path(
92    post, path = "/api/v1/workflow-tasks/{id}/commands",
93    tag = "workflow-tasks",
94    params(("id" = String, Path, description = "Workflow ID")),
95    request_body = SubmitCommandsRequest,
96    responses((status = 200, description = "Commands processed; lease released")),
97)]
98pub async fn submit_commands<S: WorkflowStore>(
99    State(state): State<Arc<AppState<S>>>,
100    Path(workflow_id): Path<String>,
101    Json(req): Json<SubmitCommandsRequest>,
102) -> Result<axum::http::StatusCode, AppError> {
103    state
104        .engine
105        .submit_workflow_commands(&workflow_id, &req.worker_id, &req.commands)
106        .await?;
107    Ok(axum::http::StatusCode::OK)
108}