Skip to main content

assay_workflow/api/
activities.rs

1//! Activity scheduling and lookup endpoints.
2//!
3//! These endpoints are the public face of `Engine::schedule_activity` and
4//! `Engine::get_activity` — workflows (running on a worker) call POST to
5//! schedule the next activity, and the worker polls GET while waiting for
6//! the result. Idempotency on `(workflow_id, seq)` makes it safe to retry.
7
8use std::sync::Arc;
9
10use axum::extract::{Path, State};
11use axum::routing::{get, post};
12use axum::{Json, Router};
13use serde::Deserialize;
14use utoipa::ToSchema;
15
16use crate::api::workflows::AppError;
17use crate::api::AppState;
18use crate::store::WorkflowStore;
19use crate::types::{ScheduleActivityOpts, WorkflowActivity};
20
21pub fn router<S: WorkflowStore + 'static>() -> Router<Arc<AppState<S>>> {
22    Router::new()
23        .route("/workflows/{id}/activities", post(schedule_activity))
24        .route("/activities/{id}", get(get_activity))
25}
26
27#[derive(Deserialize, ToSchema)]
28pub struct ScheduleActivityRequest {
29    /// Activity name (the worker matches this to a registered handler)
30    pub name: String,
31    /// Sequence number relative to the workflow. Used for idempotency:
32    /// scheduling the same `(workflow_id, seq)` twice is a no-op on the
33    /// second call. Workflows assign sequence numbers in execution order.
34    pub seq: i32,
35    /// Task queue to route the activity to (workers poll a specific queue)
36    pub task_queue: String,
37    /// JSON-serialisable input passed to the activity handler
38    pub input: Option<serde_json::Value>,
39    /// Maximum attempts before the activity is marked `FAILED` (default 3)
40    pub max_attempts: Option<i32>,
41    /// Initial retry backoff in seconds (default 1.0)
42    pub initial_interval_secs: Option<f64>,
43    /// Exponential backoff coefficient (default 2.0)
44    pub backoff_coefficient: Option<f64>,
45    /// Total time the activity has to complete before being failed (default 300)
46    pub start_to_close_secs: Option<f64>,
47    /// If set, an activity that hasn't heartbeated within this window is auto-failed
48    pub heartbeat_timeout_secs: Option<f64>,
49}
50
51#[utoipa::path(
52    post, path = "/api/v1/workflows/{id}/activities",
53    tag = "activities",
54    params(("id" = String, Path, description = "Workflow ID")),
55    request_body = ScheduleActivityRequest,
56    responses(
57        (status = 201, description = "Activity scheduled", body = WorkflowActivity),
58    ),
59)]
60pub async fn schedule_activity<S: WorkflowStore>(
61    State(state): State<Arc<AppState<S>>>,
62    Path(workflow_id): Path<String>,
63    Json(req): Json<ScheduleActivityRequest>,
64) -> Result<(axum::http::StatusCode, Json<WorkflowActivity>), AppError> {
65    let input = req.input.map(|v| v.to_string());
66    let opts = ScheduleActivityOpts {
67        max_attempts: req.max_attempts,
68        initial_interval_secs: req.initial_interval_secs,
69        backoff_coefficient: req.backoff_coefficient,
70        start_to_close_secs: req.start_to_close_secs,
71        heartbeat_timeout_secs: req.heartbeat_timeout_secs,
72    };
73    let act = state
74        .engine
75        .schedule_activity(
76            &workflow_id,
77            req.seq,
78            &req.name,
79            input.as_deref(),
80            &req.task_queue,
81            opts,
82        )
83        .await?;
84    Ok((axum::http::StatusCode::CREATED, Json(act)))
85}
86
87#[utoipa::path(
88    get, path = "/api/v1/activities/{id}",
89    tag = "activities",
90    params(("id" = i64, Path, description = "Activity ID")),
91    responses(
92        (status = 200, description = "Activity record", body = WorkflowActivity),
93        (status = 404, description = "Not found"),
94    ),
95)]
96pub async fn get_activity<S: WorkflowStore>(
97    State(state): State<Arc<AppState<S>>>,
98    Path(id): Path<i64>,
99) -> Result<Json<WorkflowActivity>, AppError> {
100    match state.engine.get_activity(id).await? {
101        Some(a) => Ok(Json(a)),
102        None => Err(AppError::NotFound(format!("activity {id} not found"))),
103    }
104}