1use axum::{
20 extract::{Path, Query, State},
21 http::HeaderMap,
22 Json,
23};
24use mockforge_registry_core::models::flow::CreateFlow;
25use mockforge_registry_core::models::test_run::EnqueueTestRun;
26use serde::{Deserialize, Serialize};
27use uuid::Uuid;
28
29use crate::{
30 error::{ApiError, ApiResult},
31 middleware::{resolve_org_context, AuthUser},
32 models::{CloudWorkspace, Flow, FlowVersion, TestRun},
33 AppState,
34};
35
36#[derive(Debug, Deserialize)]
37pub struct ListFlowsQuery {
38 #[serde(default)]
40 pub kind: Option<String>,
41}
42
43pub async fn list_flows(
45 State(state): State<AppState>,
46 AuthUser(user_id): AuthUser,
47 Path(workspace_id): Path<Uuid>,
48 Query(query): Query<ListFlowsQuery>,
49 headers: HeaderMap,
50) -> ApiResult<Json<Vec<Flow>>> {
51 authorize_workspace(&state, user_id, &headers, workspace_id).await?;
52 let flows = Flow::list_by_workspace(state.db.pool(), workspace_id, query.kind.as_deref())
53 .await
54 .map_err(ApiError::Database)?;
55 Ok(Json(flows))
56}
57
58#[derive(Debug, Deserialize)]
59pub struct CreateFlowRequest {
60 pub kind: String,
61 pub name: String,
62 #[serde(default)]
63 pub description: Option<String>,
64 pub config: serde_json::Value,
66}
67
68#[derive(Debug, Serialize)]
69pub struct FlowWithVersion {
70 #[serde(flatten)]
71 pub flow: Flow,
72 pub version: FlowVersion,
73}
74
75pub async fn create_flow(
77 State(state): State<AppState>,
78 AuthUser(user_id): AuthUser,
79 Path(workspace_id): Path<Uuid>,
80 headers: HeaderMap,
81 Json(request): Json<CreateFlowRequest>,
82) -> ApiResult<Json<FlowWithVersion>> {
83 authorize_workspace(&state, user_id, &headers, workspace_id).await?;
84
85 if request.name.trim().is_empty() {
86 return Err(ApiError::InvalidRequest("name must not be empty".into()));
87 }
88 if !Flow::is_valid_kind(&request.kind) {
89 return Err(ApiError::InvalidRequest(format!(
90 "kind must be one of: {}",
91 Flow::VALID_KINDS.join(", ")
92 )));
93 }
94
95 let (flow, version) = Flow::create_with_initial_version(
96 state.db.pool(),
97 CreateFlow {
98 workspace_id,
99 kind: &request.kind,
100 name: &request.name,
101 description: request.description.as_deref(),
102 config: &request.config,
103 created_by: Some(user_id),
104 },
105 )
106 .await
107 .map_err(ApiError::Database)?;
108
109 Ok(Json(FlowWithVersion { flow, version }))
110}
111
112pub async fn get_flow(
115 State(state): State<AppState>,
116 AuthUser(user_id): AuthUser,
117 Path(id): Path<Uuid>,
118 headers: HeaderMap,
119) -> ApiResult<Json<FlowWithVersion>> {
120 let flow = load_authorized_flow(&state, user_id, &headers, id).await?;
121 let version_id = flow
122 .current_version_id
123 .ok_or_else(|| ApiError::Internal(anyhow::anyhow!("Flow has no current version")))?;
124 let version = FlowVersion::find_by_id(state.db.pool(), version_id)
125 .await
126 .map_err(ApiError::Database)?
127 .ok_or_else(|| ApiError::Internal(anyhow::anyhow!("Flow version row missing")))?;
128 Ok(Json(FlowWithVersion { flow, version }))
129}
130
131#[derive(Debug, Deserialize)]
132pub struct UpdateFlowRequest {
133 #[serde(default)]
134 pub name: Option<String>,
135 #[serde(default, deserialize_with = "deserialize_double_option")]
137 pub description: Option<Option<String>>,
138}
139
140pub async fn update_flow(
144 State(state): State<AppState>,
145 AuthUser(user_id): AuthUser,
146 Path(id): Path<Uuid>,
147 headers: HeaderMap,
148 Json(request): Json<UpdateFlowRequest>,
149) -> ApiResult<Json<Flow>> {
150 load_authorized_flow(&state, user_id, &headers, id).await?;
151
152 let updated = Flow::rename(
153 state.db.pool(),
154 id,
155 request.name.as_deref(),
156 request.description.as_ref().map(|d| d.as_deref()),
157 )
158 .await
159 .map_err(ApiError::Database)?
160 .ok_or_else(|| ApiError::InvalidRequest("Flow not found".into()))?;
161 Ok(Json(updated))
162}
163
164pub async fn delete_flow(
166 State(state): State<AppState>,
167 AuthUser(user_id): AuthUser,
168 Path(id): Path<Uuid>,
169 headers: HeaderMap,
170) -> ApiResult<Json<serde_json::Value>> {
171 load_authorized_flow(&state, user_id, &headers, id).await?;
172
173 let deleted = Flow::delete(state.db.pool(), id).await.map_err(ApiError::Database)?;
174 if !deleted {
175 return Err(ApiError::InvalidRequest("Flow not found".into()));
176 }
177 Ok(Json(serde_json::json!({ "deleted": true })))
178}
179
180#[derive(Debug, Deserialize)]
181pub struct SaveVersionRequest {
182 pub config: serde_json::Value,
183}
184
185pub async fn save_flow_version(
190 State(state): State<AppState>,
191 AuthUser(user_id): AuthUser,
192 Path(id): Path<Uuid>,
193 headers: HeaderMap,
194 Json(request): Json<SaveVersionRequest>,
195) -> ApiResult<Json<FlowVersion>> {
196 load_authorized_flow(&state, user_id, &headers, id).await?;
197
198 let version = Flow::save_new_version(state.db.pool(), id, &request.config, Some(user_id))
199 .await
200 .map_err(ApiError::Database)?;
201 Ok(Json(version))
202}
203
204pub async fn trigger_run(
211 State(state): State<AppState>,
212 AuthUser(user_id): AuthUser,
213 Path(id): Path<Uuid>,
214 headers: HeaderMap,
215) -> ApiResult<Json<TestRun>> {
216 let flow = load_authorized_flow(&state, user_id, &headers, id).await?;
217 let workspace = CloudWorkspace::find_by_id(state.db.pool(), flow.workspace_id)
218 .await?
219 .ok_or_else(|| ApiError::InvalidRequest("Workspace not found".into()))?;
220
221 let org = mockforge_registry_core::models::Organization::find_by_id(
222 state.db.pool(),
223 workspace.org_id,
224 )
225 .await
226 .map_err(|_| ApiError::Internal(anyhow::anyhow!("DB error loading org")))?
227 .ok_or_else(|| ApiError::InvalidRequest("Organization not found".into()))?;
228 let limits = crate::handlers::usage::effective_limits(&state, &org).await?;
229 let max_concurrent = limits.get("max_concurrent_runs").and_then(|v| v.as_i64()).unwrap_or(0);
230 if max_concurrent == 0 {
231 return Err(ApiError::ResourceLimitExceeded(
232 "Test execution / flow runs are not enabled on this plan".into(),
233 ));
234 }
235 if max_concurrent > 0 {
236 let inflight = TestRun::count_inflight(state.db.pool(), workspace.org_id)
237 .await
238 .map_err(ApiError::Database)?;
239 if inflight.total() >= max_concurrent {
240 return Err(ApiError::ResourceLimitExceeded(format!(
241 "Concurrent run limit reached ({}/{}).",
242 inflight.total(),
243 max_concurrent,
244 )));
245 }
246 }
247
248 let run = TestRun::enqueue(
249 state.db.pool(),
250 EnqueueTestRun {
251 suite_id: flow.id,
252 org_id: workspace.org_id,
253 kind: &flow.kind,
254 triggered_by: "manual",
255 triggered_by_user: Some(user_id),
256 git_ref: None,
257 git_sha: None,
258 },
259 )
260 .await
261 .map_err(ApiError::Database)?;
262
263 let version_config: Option<serde_json::Value> = match flow.current_version_id {
270 Some(vid) => match FlowVersion::find_by_id(state.db.pool(), vid).await {
271 Ok(Some(v)) => Some(v.config),
272 Ok(None) => {
273 tracing::warn!(
274 flow_id = %flow.id,
275 version_id = %vid,
276 "flow.current_version_id points at missing FlowVersion",
277 );
278 None
279 }
280 Err(e) => {
281 tracing::warn!(
282 flow_id = %flow.id,
283 error = %e,
284 "failed to load FlowVersion for run payload",
285 );
286 None
287 }
288 },
289 None => None,
290 };
291
292 if let Err(e) = crate::run_queue::enqueue(
293 state.redis.as_ref(),
294 crate::run_queue::EnqueuedJob {
295 run_id: run.id,
296 org_id: run.org_id,
297 source_id: flow.id,
298 kind: &flow.kind,
299 payload: serde_json::json!({
300 "flow_kind": flow.kind,
301 "flow_name": flow.name,
302 "current_version_id": flow.current_version_id,
303 "config": version_config,
304 }),
305 },
306 )
307 .await
308 {
309 tracing::error!(run_id = %run.id, error = %e, "failed to enqueue flow run");
310 }
311
312 Ok(Json(run))
313}
314
315pub async fn list_flow_versions(
317 State(state): State<AppState>,
318 AuthUser(user_id): AuthUser,
319 Path(id): Path<Uuid>,
320 headers: HeaderMap,
321) -> ApiResult<Json<Vec<FlowVersion>>> {
322 load_authorized_flow(&state, user_id, &headers, id).await?;
323 let versions = FlowVersion::list_by_flow(state.db.pool(), id)
324 .await
325 .map_err(ApiError::Database)?;
326 Ok(Json(versions))
327}
328
329pub async fn get_flow_version(
332 State(state): State<AppState>,
333 AuthUser(user_id): AuthUser,
334 Path(version_id): Path<Uuid>,
335 headers: HeaderMap,
336) -> ApiResult<Json<FlowVersion>> {
337 let version = FlowVersion::find_by_id(state.db.pool(), version_id)
338 .await
339 .map_err(ApiError::Database)?
340 .ok_or_else(|| ApiError::InvalidRequest("Flow version not found".into()))?;
341 load_authorized_flow(&state, user_id, &headers, version.flow_id).await?;
343 Ok(Json(version))
344}
345
346async fn authorize_workspace(
347 state: &AppState,
348 user_id: Uuid,
349 headers: &HeaderMap,
350 workspace_id: Uuid,
351) -> ApiResult<()> {
352 let workspace = CloudWorkspace::find_by_id(state.db.pool(), workspace_id)
353 .await?
354 .ok_or_else(|| ApiError::InvalidRequest("Workspace not found".into()))?;
355 let ctx = resolve_org_context(state, user_id, headers, None)
356 .await
357 .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
358 if ctx.org_id != workspace.org_id {
359 return Err(ApiError::InvalidRequest("Workspace not found".into()));
360 }
361 Ok(())
362}
363
364async fn load_authorized_flow(
365 state: &AppState,
366 user_id: Uuid,
367 headers: &HeaderMap,
368 id: Uuid,
369) -> ApiResult<Flow> {
370 let flow = Flow::find_by_id(state.db.pool(), id)
371 .await
372 .map_err(ApiError::Database)?
373 .ok_or_else(|| ApiError::InvalidRequest("Flow not found".into()))?;
374 let workspace = CloudWorkspace::find_by_id(state.db.pool(), flow.workspace_id)
375 .await?
376 .ok_or_else(|| ApiError::InvalidRequest("Flow not found".into()))?;
377 let ctx = resolve_org_context(state, user_id, headers, None)
378 .await
379 .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
380 if ctx.org_id != workspace.org_id {
381 return Err(ApiError::InvalidRequest("Flow not found".into()));
382 }
383 Ok(flow)
384}
385
386fn deserialize_double_option<'de, T, D>(deserializer: D) -> Result<Option<Option<T>>, D::Error>
387where
388 T: Deserialize<'de>,
389 D: serde::Deserializer<'de>,
390{
391 Option::<T>::deserialize(deserializer).map(Some)
392}