use std::sync::Arc;
use axum::extract::{Path, State};
use axum::routing::post;
use axum::{Json, Router};
use serde::Deserialize;
use utoipa::ToSchema;
use crate::api::workflows::AppError;
use crate::api::AppState;
use crate::store::WorkflowStore;
pub fn router<S: WorkflowStore + 'static>() -> Router<Arc<AppState<S>>> {
Router::new()
.route("/workflow-tasks/poll", post(poll_workflow_task))
.route("/workflow-tasks/{id}/commands", post(submit_commands))
}
#[derive(Deserialize, ToSchema)]
pub struct PollWorkflowTaskRequest {
pub queue: String,
pub worker_id: String,
}
#[utoipa::path(
post, path = "/api/v1/workflow-tasks/poll",
tag = "workflow-tasks",
request_body = PollWorkflowTaskRequest,
responses(
(status = 200, description = "Workflow task or null"),
),
)]
pub async fn poll_workflow_task<S: WorkflowStore>(
State(state): State<Arc<AppState<S>>>,
Json(req): Json<PollWorkflowTaskRequest>,
) -> Result<Json<serde_json::Value>, AppError> {
match state
.engine
.claim_workflow_task(&req.queue, &req.worker_id)
.await?
{
Some((wf, history)) => Ok(Json(serde_json::json!({
"workflow_id": wf.id,
"namespace": wf.namespace,
"workflow_type": wf.workflow_type,
"task_queue": wf.task_queue,
"input": wf.input.as_deref().and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
"history": history.iter().map(|e| serde_json::json!({
"seq": e.seq,
"event_type": e.event_type,
"payload": e.payload.as_deref().and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
"timestamp": e.timestamp,
})).collect::<Vec<_>>(),
}))),
None => Ok(Json(serde_json::Value::Null)),
}
}
#[derive(Deserialize, ToSchema)]
pub struct SubmitCommandsRequest {
pub worker_id: String,
pub commands: Vec<serde_json::Value>,
}
#[utoipa::path(
post, path = "/api/v1/workflow-tasks/{id}/commands",
tag = "workflow-tasks",
params(("id" = String, Path, description = "Workflow ID")),
request_body = SubmitCommandsRequest,
responses((status = 200, description = "Commands processed; lease released")),
)]
pub async fn submit_commands<S: WorkflowStore>(
State(state): State<Arc<AppState<S>>>,
Path(workflow_id): Path<String>,
Json(req): Json<SubmitCommandsRequest>,
) -> Result<axum::http::StatusCode, AppError> {
state
.engine
.submit_workflow_commands(&workflow_id, &req.worker_id, &req.commands)
.await?;
Ok(axum::http::StatusCode::OK)
}