Skip to main content

mockforge_registry_server/handlers/
captures.rs

1//! Capture sessions + behavioral-clone model handlers
2//! (cloud-enablement task #6 / Phase 1).
3//!
4//! Phase 1 surface: capture-session CRUD + member management,
5//! clone-model read paths + create-training row. Actual training
6//! worker / replay endpoint / per-capture cloud-shipping land in
7//! follow-up slices.
8//!
9//! Routes:
10//!   GET    /api/v1/workspaces/{workspace_id}/capture-sessions
11//!   POST   /api/v1/workspaces/{workspace_id}/capture-sessions
12//!   PATCH  /api/v1/capture-sessions/{id}/members         (add/remove)
13//!   DELETE /api/v1/capture-sessions/{id}
14//!
15//!   GET    /api/v1/workspaces/{workspace_id}/clone-models
16//!   POST   /api/v1/capture-sessions/{id}/train          (enqueues training)
17//!   GET    /api/v1/clone-models/{id}
18//!   DELETE /api/v1/clone-models/{id}
19
20use axum::{
21    extract::{Path, State},
22    http::HeaderMap,
23    Json,
24};
25use serde::Deserialize;
26use uuid::Uuid;
27
28use mockforge_registry_core::models::test_run::EnqueueTestRun;
29
30use crate::{
31    error::{ApiError, ApiResult},
32    handlers::usage::effective_limits,
33    middleware::{resolve_org_context, AuthUser},
34    models::{CaptureSession, CloneModel, CloudWorkspace, TestRun},
35    AppState,
36};
37
38// --- capture sessions ------------------------------------------------------
39
40/// `GET /api/v1/workspaces/{workspace_id}/capture-sessions`
41pub async fn list_sessions(
42    State(state): State<AppState>,
43    AuthUser(user_id): AuthUser,
44    Path(workspace_id): Path<Uuid>,
45    headers: HeaderMap,
46) -> ApiResult<Json<Vec<CaptureSession>>> {
47    authorize_workspace(&state, user_id, &headers, workspace_id).await?;
48    let rows = CaptureSession::list_by_workspace(state.db.pool(), workspace_id)
49        .await
50        .map_err(ApiError::Database)?;
51    Ok(Json(rows))
52}
53
54#[derive(Debug, Deserialize)]
55pub struct CreateSessionRequest {
56    pub name: String,
57    #[serde(default)]
58    pub description: Option<String>,
59}
60
61/// `POST /api/v1/workspaces/{workspace_id}/capture-sessions`
62pub async fn create_session(
63    State(state): State<AppState>,
64    AuthUser(user_id): AuthUser,
65    Path(workspace_id): Path<Uuid>,
66    headers: HeaderMap,
67    Json(request): Json<CreateSessionRequest>,
68) -> ApiResult<Json<CaptureSession>> {
69    authorize_workspace(&state, user_id, &headers, workspace_id).await?;
70    if request.name.trim().is_empty() {
71        return Err(ApiError::InvalidRequest("name must not be empty".into()));
72    }
73    let row = CaptureSession::create(
74        state.db.pool(),
75        workspace_id,
76        &request.name,
77        request.description.as_deref(),
78        Some(user_id),
79    )
80    .await
81    .map_err(ApiError::Database)?;
82    Ok(Json(row))
83}
84
85#[derive(Debug, Deserialize)]
86#[serde(tag = "op", rename_all = "snake_case")]
87pub enum MembersOp {
88    Add { capture_id: Uuid },
89    Remove { capture_id: Uuid },
90}
91
92/// `PATCH /api/v1/capture-sessions/{id}/members`
93///
94/// Body: `{"op": "add", "capture_id": "..."}` or
95///       `{"op": "remove", "capture_id": "..."}`. Idempotent — repeated
96/// adds/removes are no-ops.
97pub async fn modify_session_members(
98    State(state): State<AppState>,
99    AuthUser(user_id): AuthUser,
100    Path(id): Path<Uuid>,
101    headers: HeaderMap,
102    Json(op): Json<MembersOp>,
103) -> ApiResult<Json<serde_json::Value>> {
104    let session = load_authorized_session(&state, user_id, &headers, id).await?;
105    let changed = match op {
106        MembersOp::Add { capture_id } => {
107            CaptureSession::add_member(state.db.pool(), session.id, capture_id)
108                .await
109                .map_err(ApiError::Database)?
110        }
111        MembersOp::Remove { capture_id } => {
112            CaptureSession::remove_member(state.db.pool(), session.id, capture_id)
113                .await
114                .map_err(ApiError::Database)?
115        }
116    };
117    Ok(Json(serde_json::json!({ "changed": changed })))
118}
119
120/// `DELETE /api/v1/capture-sessions/{id}`
121pub async fn delete_session(
122    State(state): State<AppState>,
123    AuthUser(user_id): AuthUser,
124    Path(id): Path<Uuid>,
125    headers: HeaderMap,
126) -> ApiResult<Json<serde_json::Value>> {
127    load_authorized_session(&state, user_id, &headers, id).await?;
128    let deleted = CaptureSession::delete(state.db.pool(), id).await.map_err(ApiError::Database)?;
129    if !deleted {
130        return Err(ApiError::InvalidRequest("Capture session not found".into()));
131    }
132    Ok(Json(serde_json::json!({ "deleted": true })))
133}
134
135// --- clone models ----------------------------------------------------------
136
137/// `GET /api/v1/workspaces/{workspace_id}/clone-models`
138pub async fn list_clone_models(
139    State(state): State<AppState>,
140    AuthUser(user_id): AuthUser,
141    Path(workspace_id): Path<Uuid>,
142    headers: HeaderMap,
143) -> ApiResult<Json<Vec<CloneModel>>> {
144    authorize_workspace(&state, user_id, &headers, workspace_id).await?;
145    let rows = CloneModel::list_by_workspace(state.db.pool(), workspace_id)
146        .await
147        .map_err(ApiError::Database)?;
148    Ok(Json(rows))
149}
150
151#[derive(Debug, Deserialize)]
152pub struct TrainCloneRequest {
153    pub name: String,
154}
155
156/// `POST /api/v1/capture-sessions/{id}/train`
157///
158/// Creates the clone_models row in `training` state AND enqueues a
159/// `behavioral_clone` test_run so the worker actually picks it up. The
160/// CloneTrainExecutor reports back via internal callbacks (status flips
161/// to terminal, runner_seconds metered).
162pub async fn train_clone_from_session(
163    State(state): State<AppState>,
164    AuthUser(user_id): AuthUser,
165    Path(session_id): Path<Uuid>,
166    headers: HeaderMap,
167    Json(request): Json<TrainCloneRequest>,
168) -> ApiResult<Json<CloneModel>> {
169    let session = load_authorized_session(&state, user_id, &headers, session_id).await?;
170    if request.name.trim().is_empty() {
171        return Err(ApiError::InvalidRequest("name must not be empty".into()));
172    }
173
174    let workspace = CloudWorkspace::find_by_id(state.db.pool(), session.workspace_id)
175        .await?
176        .ok_or_else(|| ApiError::InvalidRequest("Workspace not found".into()))?;
177
178    let limits = effective_limits(&state, &load_org(&state, workspace.org_id).await?).await?;
179    let max_clones = limits.get("max_clone_models").and_then(|v| v.as_i64()).unwrap_or(0);
180    if max_clones == 0 {
181        return Err(ApiError::ResourceLimitExceeded(
182            "Behavioral cloning is not enabled on this plan".into(),
183        ));
184    }
185
186    let row = CloneModel::create_training(
187        state.db.pool(),
188        workspace.org_id,
189        session.workspace_id,
190        Some(session.id),
191        &request.name,
192    )
193    .await
194    .map_err(ApiError::Database)?;
195
196    let run = TestRun::enqueue(
197        state.db.pool(),
198        EnqueueTestRun {
199            suite_id: row.id,
200            org_id: workspace.org_id,
201            kind: "behavioral_clone",
202            triggered_by: "manual",
203            triggered_by_user: Some(user_id),
204            git_ref: None,
205            git_sha: None,
206        },
207    )
208    .await
209    .map_err(ApiError::Database)?;
210
211    if let Err(e) = crate::run_queue::enqueue(
212        state.redis.as_ref(),
213        crate::run_queue::EnqueuedJob {
214            run_id: run.id,
215            org_id: run.org_id,
216            source_id: row.id,
217            kind: "behavioral_clone",
218            payload: serde_json::json!({
219                "session_id": session.id,
220                "clone_model_id": row.id,
221                "name": request.name,
222            }),
223        },
224    )
225    .await
226    {
227        tracing::error!(run_id = %run.id, error = %e, "failed to enqueue behavioral_clone run");
228    }
229
230    Ok(Json(row))
231}
232
233/// `POST /api/v1/capture-sessions/{id}/replay`
234///
235/// Triggers a synthetic replay of the capture session against a target.
236/// Reuses the test_runs lifecycle with kind='replay'. The runner-side
237/// ReplayExecutor synthesizes per-capture events until real impl
238/// (real HTTP replay against `target_url`) lands.
239#[derive(Debug, Deserialize)]
240pub struct ReplaySessionRequest {
241    #[serde(default)]
242    pub target_url: Option<String>,
243    /// How many synthetic captures the executor should pretend to replay.
244    /// Optional — defaults to 5 on the runner side. Ignored once real
245    /// replay lands.
246    #[serde(default)]
247    pub synthetic_captures: Option<u32>,
248}
249
250pub async fn replay_capture_session(
251    State(state): State<AppState>,
252    AuthUser(user_id): AuthUser,
253    Path(session_id): Path<Uuid>,
254    headers: HeaderMap,
255    Json(request): Json<ReplaySessionRequest>,
256) -> ApiResult<Json<TestRun>> {
257    let session = load_authorized_session(&state, user_id, &headers, session_id).await?;
258    let workspace = CloudWorkspace::find_by_id(state.db.pool(), session.workspace_id)
259        .await?
260        .ok_or_else(|| ApiError::InvalidRequest("Workspace not found".into()))?;
261
262    let run = TestRun::enqueue(
263        state.db.pool(),
264        EnqueueTestRun {
265            suite_id: session.id,
266            org_id: workspace.org_id,
267            kind: "replay",
268            triggered_by: "manual",
269            triggered_by_user: Some(user_id),
270            git_ref: None,
271            git_sha: None,
272        },
273    )
274    .await
275    .map_err(ApiError::Database)?;
276
277    let mut payload = serde_json::Map::new();
278    payload.insert("session_id".into(), serde_json::json!(session.id));
279    if let Some(url) = request.target_url.as_deref() {
280        payload.insert("target_url".into(), serde_json::Value::String(url.to_string()));
281    }
282    if let Some(n) = request.synthetic_captures {
283        payload.insert("synthetic_captures".into(), serde_json::json!(n));
284    }
285
286    if let Err(e) = crate::run_queue::enqueue(
287        state.redis.as_ref(),
288        crate::run_queue::EnqueuedJob {
289            run_id: run.id,
290            org_id: run.org_id,
291            source_id: session.id,
292            kind: "replay",
293            payload: serde_json::Value::Object(payload),
294        },
295    )
296    .await
297    {
298        tracing::error!(run_id = %run.id, error = %e, "failed to enqueue replay run");
299    }
300
301    Ok(Json(run))
302}
303
304/// `GET /api/v1/clone-models/{id}`
305pub async fn get_clone_model(
306    State(state): State<AppState>,
307    AuthUser(user_id): AuthUser,
308    Path(id): Path<Uuid>,
309    headers: HeaderMap,
310) -> ApiResult<Json<CloneModel>> {
311    let model = load_authorized_clone(&state, user_id, &headers, id).await?;
312    Ok(Json(model))
313}
314
315/// `DELETE /api/v1/clone-models/{id}`
316pub async fn delete_clone_model(
317    State(state): State<AppState>,
318    AuthUser(user_id): AuthUser,
319    Path(id): Path<Uuid>,
320    headers: HeaderMap,
321) -> ApiResult<Json<serde_json::Value>> {
322    load_authorized_clone(&state, user_id, &headers, id).await?;
323    let deleted = CloneModel::delete(state.db.pool(), id).await.map_err(ApiError::Database)?;
324    if !deleted {
325        return Err(ApiError::InvalidRequest("Clone model not found".into()));
326    }
327    Ok(Json(serde_json::json!({ "deleted": true })))
328}
329
330async fn authorize_workspace(
331    state: &AppState,
332    user_id: Uuid,
333    headers: &HeaderMap,
334    workspace_id: Uuid,
335) -> ApiResult<()> {
336    let workspace = CloudWorkspace::find_by_id(state.db.pool(), workspace_id)
337        .await?
338        .ok_or_else(|| ApiError::InvalidRequest("Workspace not found".into()))?;
339    let ctx = resolve_org_context(state, user_id, headers, None)
340        .await
341        .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
342    if ctx.org_id != workspace.org_id {
343        return Err(ApiError::InvalidRequest("Workspace not found".into()));
344    }
345    Ok(())
346}
347
348async fn load_authorized_session(
349    state: &AppState,
350    user_id: Uuid,
351    headers: &HeaderMap,
352    id: Uuid,
353) -> ApiResult<CaptureSession> {
354    let session = CaptureSession::find_by_id(state.db.pool(), id)
355        .await
356        .map_err(ApiError::Database)?
357        .ok_or_else(|| ApiError::InvalidRequest("Capture session not found".into()))?;
358    authorize_workspace(state, user_id, headers, session.workspace_id).await?;
359    Ok(session)
360}
361
362async fn load_authorized_clone(
363    state: &AppState,
364    user_id: Uuid,
365    headers: &HeaderMap,
366    id: Uuid,
367) -> ApiResult<CloneModel> {
368    let model = CloneModel::find_by_id(state.db.pool(), id)
369        .await
370        .map_err(ApiError::Database)?
371        .ok_or_else(|| ApiError::InvalidRequest("Clone model not found".into()))?;
372    authorize_workspace(state, user_id, headers, model.workspace_id).await?;
373    Ok(model)
374}
375
376async fn load_org(
377    state: &AppState,
378    org_id: Uuid,
379) -> ApiResult<mockforge_registry_core::models::Organization> {
380    use mockforge_registry_core::models::Organization;
381    Organization::find_by_id(state.db.pool(), org_id)
382        .await
383        .map_err(|_| ApiError::Internal(anyhow::anyhow!("DB error loading org")))?
384        .ok_or_else(|| ApiError::InvalidRequest("Organization not found".into()))
385}