assay_workflow/api/
workflow_tasks.rs1use std::sync::Arc;
21
22use axum::extract::{Path, State};
23use axum::routing::post;
24use axum::{Json, Router};
25use serde::Deserialize;
26use utoipa::ToSchema;
27
28use crate::api::workflows::AppError;
29use crate::api::AppState;
30use crate::store::WorkflowStore;
31
32pub fn router<S: WorkflowStore + 'static>() -> Router<Arc<AppState<S>>> {
33 Router::new()
34 .route("/workflow-tasks/poll", post(poll_workflow_task))
35 .route("/workflow-tasks/{id}/commands", post(submit_commands))
36}
37
38#[derive(Deserialize, ToSchema)]
39pub struct PollWorkflowTaskRequest {
40 pub queue: String,
41 pub worker_id: String,
42}
43
44#[utoipa::path(
47 post, path = "/api/v1/workflow-tasks/poll",
48 tag = "workflow-tasks",
49 request_body = PollWorkflowTaskRequest,
50 responses(
51 (status = 200, description = "Workflow task or null"),
52 ),
53)]
54pub async fn poll_workflow_task<S: WorkflowStore>(
55 State(state): State<Arc<AppState<S>>>,
56 Json(req): Json<PollWorkflowTaskRequest>,
57) -> Result<Json<serde_json::Value>, AppError> {
58 match state
59 .engine
60 .claim_workflow_task(&req.queue, &req.worker_id)
61 .await?
62 {
63 Some((wf, history)) => Ok(Json(serde_json::json!({
64 "workflow_id": wf.id,
65 "namespace": wf.namespace,
66 "workflow_type": wf.workflow_type,
67 "task_queue": wf.task_queue,
68 "input": wf.input.as_deref().and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
69 "history": history.iter().map(|e| serde_json::json!({
70 "seq": e.seq,
71 "event_type": e.event_type,
72 "payload": e.payload.as_deref().and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
73 "timestamp": e.timestamp,
74 })).collect::<Vec<_>>(),
75 }))),
76 None => Ok(Json(serde_json::Value::Null)),
77 }
78}
79
80#[derive(Deserialize, ToSchema)]
81pub struct SubmitCommandsRequest {
82 pub worker_id: String,
83 pub commands: Vec<serde_json::Value>,
84}
85
86#[utoipa::path(
92 post, path = "/api/v1/workflow-tasks/{id}/commands",
93 tag = "workflow-tasks",
94 params(("id" = String, Path, description = "Workflow ID")),
95 request_body = SubmitCommandsRequest,
96 responses((status = 200, description = "Commands processed; lease released")),
97)]
98pub async fn submit_commands<S: WorkflowStore>(
99 State(state): State<Arc<AppState<S>>>,
100 Path(workflow_id): Path<String>,
101 Json(req): Json<SubmitCommandsRequest>,
102) -> Result<axum::http::StatusCode, AppError> {
103 state
104 .engine
105 .submit_workflow_commands(&workflow_id, &req.worker_id, &req.commands)
106 .await?;
107 Ok(axum::http::StatusCode::OK)
108}