Skip to main content

mockforge_registry_server/handlers/
flows.rs

1//! Flows handlers — unified scenario/orchestration/state-machine/chain
2//! resource (cloud-enablement task #9 / Phase 1).
3//!
4//! Phase 1 surface: CRUD over `flows` + read access to `flow_versions`.
5//! Triggering a run reuses the #4 test_runs lifecycle with `kind` =
6//! 'scenario' / 'orchestration' / 'state_machine' / 'chain' — wiring
7//! that path lives in a follow-up slice once the worker pool is up.
8//!
9//! Routes:
10//!   GET    /api/v1/workspaces/{workspace_id}/flows[?kind=]
11//!   POST   /api/v1/workspaces/{workspace_id}/flows
12//!   GET    /api/v1/flows/{id}                              (with current version)
13//!   PATCH  /api/v1/flows/{id}                              (rename / description)
14//!   DELETE /api/v1/flows/{id}
15//!   POST   /api/v1/flows/{id}/versions                     (save new version)
16//!   GET    /api/v1/flows/{id}/versions                     (history)
17//!   GET    /api/v1/flow-versions/{version_id}
18
19use axum::{
20    extract::{Path, Query, State},
21    http::HeaderMap,
22    Json,
23};
24use mockforge_registry_core::models::flow::CreateFlow;
25use mockforge_registry_core::models::test_run::EnqueueTestRun;
26use serde::{Deserialize, Serialize};
27use uuid::Uuid;
28
29use crate::{
30    error::{ApiError, ApiResult},
31    middleware::{resolve_org_context, AuthUser},
32    models::{CloudWorkspace, Flow, FlowVersion, TestRun},
33    AppState,
34};
35
36#[derive(Debug, Deserialize)]
37pub struct ListFlowsQuery {
38    /// Optional kind filter, e.g. ?kind=scenario.
39    #[serde(default)]
40    pub kind: Option<String>,
41}
42
43/// `GET /api/v1/workspaces/{workspace_id}/flows`
44pub async fn list_flows(
45    State(state): State<AppState>,
46    AuthUser(user_id): AuthUser,
47    Path(workspace_id): Path<Uuid>,
48    Query(query): Query<ListFlowsQuery>,
49    headers: HeaderMap,
50) -> ApiResult<Json<Vec<Flow>>> {
51    authorize_workspace(&state, user_id, &headers, workspace_id).await?;
52    let flows = Flow::list_by_workspace(state.db.pool(), workspace_id, query.kind.as_deref())
53        .await
54        .map_err(ApiError::Database)?;
55    Ok(Json(flows))
56}
57
58#[derive(Debug, Deserialize)]
59pub struct CreateFlowRequest {
60    pub kind: String,
61    pub name: String,
62    #[serde(default)]
63    pub description: Option<String>,
64    /// Initial version's config payload.
65    pub config: serde_json::Value,
66}
67
68#[derive(Debug, Serialize)]
69pub struct FlowWithVersion {
70    #[serde(flatten)]
71    pub flow: Flow,
72    pub version: FlowVersion,
73}
74
75/// `POST /api/v1/workspaces/{workspace_id}/flows`
76pub async fn create_flow(
77    State(state): State<AppState>,
78    AuthUser(user_id): AuthUser,
79    Path(workspace_id): Path<Uuid>,
80    headers: HeaderMap,
81    Json(request): Json<CreateFlowRequest>,
82) -> ApiResult<Json<FlowWithVersion>> {
83    authorize_workspace(&state, user_id, &headers, workspace_id).await?;
84
85    if request.name.trim().is_empty() {
86        return Err(ApiError::InvalidRequest("name must not be empty".into()));
87    }
88    if !Flow::is_valid_kind(&request.kind) {
89        return Err(ApiError::InvalidRequest(format!(
90            "kind must be one of: {}",
91            Flow::VALID_KINDS.join(", ")
92        )));
93    }
94
95    let (flow, version) = Flow::create_with_initial_version(
96        state.db.pool(),
97        CreateFlow {
98            workspace_id,
99            kind: &request.kind,
100            name: &request.name,
101            description: request.description.as_deref(),
102            config: &request.config,
103            created_by: Some(user_id),
104        },
105    )
106    .await
107    .map_err(ApiError::Database)?;
108
109    Ok(Json(FlowWithVersion { flow, version }))
110}
111
112/// `GET /api/v1/flows/{id}` — returns the flow plus its current version,
113/// so the editor can render the canvas in one round-trip.
114pub async fn get_flow(
115    State(state): State<AppState>,
116    AuthUser(user_id): AuthUser,
117    Path(id): Path<Uuid>,
118    headers: HeaderMap,
119) -> ApiResult<Json<FlowWithVersion>> {
120    let flow = load_authorized_flow(&state, user_id, &headers, id).await?;
121    let version_id = flow
122        .current_version_id
123        .ok_or_else(|| ApiError::Internal(anyhow::anyhow!("Flow has no current version")))?;
124    let version = FlowVersion::find_by_id(state.db.pool(), version_id)
125        .await
126        .map_err(ApiError::Database)?
127        .ok_or_else(|| ApiError::Internal(anyhow::anyhow!("Flow version row missing")))?;
128    Ok(Json(FlowWithVersion { flow, version }))
129}
130
131#[derive(Debug, Deserialize)]
132pub struct UpdateFlowRequest {
133    #[serde(default)]
134    pub name: Option<String>,
135    /// Outer Option = "field present"; inner = "set to NULL".
136    #[serde(default, deserialize_with = "deserialize_double_option")]
137    pub description: Option<Option<String>>,
138}
139
140/// `PATCH /api/v1/flows/{id}` — only renames/edits the description.
141/// Saving config changes goes through `POST /flows/{id}/versions` so the
142/// version history is preserved.
143pub async fn update_flow(
144    State(state): State<AppState>,
145    AuthUser(user_id): AuthUser,
146    Path(id): Path<Uuid>,
147    headers: HeaderMap,
148    Json(request): Json<UpdateFlowRequest>,
149) -> ApiResult<Json<Flow>> {
150    load_authorized_flow(&state, user_id, &headers, id).await?;
151
152    let updated = Flow::rename(
153        state.db.pool(),
154        id,
155        request.name.as_deref(),
156        request.description.as_ref().map(|d| d.as_deref()),
157    )
158    .await
159    .map_err(ApiError::Database)?
160    .ok_or_else(|| ApiError::InvalidRequest("Flow not found".into()))?;
161    Ok(Json(updated))
162}
163
164/// `DELETE /api/v1/flows/{id}` — cascade-deletes all versions.
165pub async fn delete_flow(
166    State(state): State<AppState>,
167    AuthUser(user_id): AuthUser,
168    Path(id): Path<Uuid>,
169    headers: HeaderMap,
170) -> ApiResult<Json<serde_json::Value>> {
171    load_authorized_flow(&state, user_id, &headers, id).await?;
172
173    let deleted = Flow::delete(state.db.pool(), id).await.map_err(ApiError::Database)?;
174    if !deleted {
175        return Err(ApiError::InvalidRequest("Flow not found".into()));
176    }
177    Ok(Json(serde_json::json!({ "deleted": true })))
178}
179
180#[derive(Debug, Deserialize)]
181pub struct SaveVersionRequest {
182    pub config: serde_json::Value,
183}
184
185/// `POST /api/v1/flows/{id}/versions`
186///
187/// Inserts a new flow_version and points `flows.current_version_id` at
188/// it in the same transaction. Old versions stay around for rollback.
189pub async fn save_flow_version(
190    State(state): State<AppState>,
191    AuthUser(user_id): AuthUser,
192    Path(id): Path<Uuid>,
193    headers: HeaderMap,
194    Json(request): Json<SaveVersionRequest>,
195) -> ApiResult<Json<FlowVersion>> {
196    load_authorized_flow(&state, user_id, &headers, id).await?;
197
198    let version = Flow::save_new_version(state.db.pool(), id, &request.config, Some(user_id))
199        .await
200        .map_err(ApiError::Database)?;
201    Ok(Json(version))
202}
203
204/// `POST /api/v1/flows/{id}/runs`
205///
206/// Triggers a flow execution. Reuses the #4 test_runs lifecycle with
207/// `kind` = the flow's own kind (scenario / orchestration / state_machine
208/// / chain), so it shares the runner pool, concurrency cap, and
209/// runner_seconds metering with regular test runs.
210pub async fn trigger_run(
211    State(state): State<AppState>,
212    AuthUser(user_id): AuthUser,
213    Path(id): Path<Uuid>,
214    headers: HeaderMap,
215) -> ApiResult<Json<TestRun>> {
216    let flow = load_authorized_flow(&state, user_id, &headers, id).await?;
217    let workspace = CloudWorkspace::find_by_id(state.db.pool(), flow.workspace_id)
218        .await?
219        .ok_or_else(|| ApiError::InvalidRequest("Workspace not found".into()))?;
220
221    let org = mockforge_registry_core::models::Organization::find_by_id(
222        state.db.pool(),
223        workspace.org_id,
224    )
225    .await
226    .map_err(|_| ApiError::Internal(anyhow::anyhow!("DB error loading org")))?
227    .ok_or_else(|| ApiError::InvalidRequest("Organization not found".into()))?;
228    let limits = crate::handlers::usage::effective_limits(&state, &org).await?;
229    let max_concurrent = limits.get("max_concurrent_runs").and_then(|v| v.as_i64()).unwrap_or(0);
230    if max_concurrent == 0 {
231        return Err(ApiError::ResourceLimitExceeded(
232            "Test execution / flow runs are not enabled on this plan".into(),
233        ));
234    }
235    if max_concurrent > 0 {
236        let inflight = TestRun::count_inflight(state.db.pool(), workspace.org_id)
237            .await
238            .map_err(ApiError::Database)?;
239        if inflight.total() >= max_concurrent {
240            return Err(ApiError::ResourceLimitExceeded(format!(
241                "Concurrent run limit reached ({}/{}).",
242                inflight.total(),
243                max_concurrent,
244            )));
245        }
246    }
247
248    let run = TestRun::enqueue(
249        state.db.pool(),
250        EnqueueTestRun {
251            suite_id: flow.id,
252            org_id: workspace.org_id,
253            kind: &flow.kind,
254            triggered_by: "manual",
255            triggered_by_user: Some(user_id),
256            git_ref: None,
257            git_sha: None,
258        },
259    )
260    .await
261    .map_err(ApiError::Database)?;
262
263    // Push payload includes the flow's current_version_id AND its
264    // config — saves the runner a round trip to fetch what it needs to
265    // execute. Loading the version is best-effort: a missing version
266    // (FK should prevent it but races are possible) is logged and the
267    // run still queues with whatever metadata we have, so the runner
268    // can record the failure with context.
269    let version_config: Option<serde_json::Value> = match flow.current_version_id {
270        Some(vid) => match FlowVersion::find_by_id(state.db.pool(), vid).await {
271            Ok(Some(v)) => Some(v.config),
272            Ok(None) => {
273                tracing::warn!(
274                    flow_id = %flow.id,
275                    version_id = %vid,
276                    "flow.current_version_id points at missing FlowVersion",
277                );
278                None
279            }
280            Err(e) => {
281                tracing::warn!(
282                    flow_id = %flow.id,
283                    error = %e,
284                    "failed to load FlowVersion for run payload",
285                );
286                None
287            }
288        },
289        None => None,
290    };
291
292    if let Err(e) = crate::run_queue::enqueue(
293        state.redis.as_ref(),
294        crate::run_queue::EnqueuedJob {
295            run_id: run.id,
296            org_id: run.org_id,
297            source_id: flow.id,
298            kind: &flow.kind,
299            payload: serde_json::json!({
300                "flow_kind": flow.kind,
301                "flow_name": flow.name,
302                "current_version_id": flow.current_version_id,
303                "config": version_config,
304            }),
305        },
306    )
307    .await
308    {
309        tracing::error!(run_id = %run.id, error = %e, "failed to enqueue flow run");
310    }
311
312    Ok(Json(run))
313}
314
315/// `GET /api/v1/flows/{id}/versions` — full version history, newest first.
316pub async fn list_flow_versions(
317    State(state): State<AppState>,
318    AuthUser(user_id): AuthUser,
319    Path(id): Path<Uuid>,
320    headers: HeaderMap,
321) -> ApiResult<Json<Vec<FlowVersion>>> {
322    load_authorized_flow(&state, user_id, &headers, id).await?;
323    let versions = FlowVersion::list_by_flow(state.db.pool(), id)
324        .await
325        .map_err(ApiError::Database)?;
326    Ok(Json(versions))
327}
328
329/// `GET /api/v1/flow-versions/{version_id}` — fetch a specific older
330/// version for diff/rollback views.
331pub async fn get_flow_version(
332    State(state): State<AppState>,
333    AuthUser(user_id): AuthUser,
334    Path(version_id): Path<Uuid>,
335    headers: HeaderMap,
336) -> ApiResult<Json<FlowVersion>> {
337    let version = FlowVersion::find_by_id(state.db.pool(), version_id)
338        .await
339        .map_err(ApiError::Database)?
340        .ok_or_else(|| ApiError::InvalidRequest("Flow version not found".into()))?;
341    // Authorize against the parent flow.
342    load_authorized_flow(&state, user_id, &headers, version.flow_id).await?;
343    Ok(Json(version))
344}
345
346async fn authorize_workspace(
347    state: &AppState,
348    user_id: Uuid,
349    headers: &HeaderMap,
350    workspace_id: Uuid,
351) -> ApiResult<()> {
352    let workspace = CloudWorkspace::find_by_id(state.db.pool(), workspace_id)
353        .await?
354        .ok_or_else(|| ApiError::InvalidRequest("Workspace not found".into()))?;
355    let ctx = resolve_org_context(state, user_id, headers, None)
356        .await
357        .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
358    if ctx.org_id != workspace.org_id {
359        return Err(ApiError::InvalidRequest("Workspace not found".into()));
360    }
361    Ok(())
362}
363
364async fn load_authorized_flow(
365    state: &AppState,
366    user_id: Uuid,
367    headers: &HeaderMap,
368    id: Uuid,
369) -> ApiResult<Flow> {
370    let flow = Flow::find_by_id(state.db.pool(), id)
371        .await
372        .map_err(ApiError::Database)?
373        .ok_or_else(|| ApiError::InvalidRequest("Flow not found".into()))?;
374    let workspace = CloudWorkspace::find_by_id(state.db.pool(), flow.workspace_id)
375        .await?
376        .ok_or_else(|| ApiError::InvalidRequest("Flow not found".into()))?;
377    let ctx = resolve_org_context(state, user_id, headers, None)
378        .await
379        .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
380    if ctx.org_id != workspace.org_id {
381        return Err(ApiError::InvalidRequest("Flow not found".into()));
382    }
383    Ok(flow)
384}
385
386fn deserialize_double_option<'de, T, D>(deserializer: D) -> Result<Option<Option<T>>, D::Error>
387where
388    T: Deserialize<'de>,
389    D: serde::Deserializer<'de>,
390{
391    Option::<T>::deserialize(deserializer).map(Some)
392}