Skip to main content

mockforge_registry_server/handlers/
request_verification.rs

1//! Cloud-mode request verification — WireMock-style assertions against the
2//! workspace's `runtime_captures` table.
3//!
4//! Mirrors the local `/__mockforge/verification/*` surface from
5//! `mockforge-core`, but sources its log entries from the per-workspace
6//! recorder pipeline instead of the in-process ring buffer. The matcher
7//! itself is the same code (`mockforge_core::verification`) so cloud and
8//! local results stay bit-for-bit consistent.
9//!
10//! Note on data availability: only deployments that have the recorder
11//! enabled write to `runtime_captures`. The UI surfaces this so users
12//! aren't surprised by a "0 matches" result against a deployment that
13//! simply isn't capturing.
14
15use axum::{
16    extract::{Path, State},
17    http::HeaderMap,
18    Json,
19};
20use chrono::{DateTime, Duration, Utc};
21use mockforge_core::{
22    request_logger::RequestLogEntry,
23    verification::{
24        verify_entries, verify_sequence_entries, VerificationCount, VerificationRequest,
25        VerificationResult,
26    },
27};
28use serde::{Deserialize, Serialize};
29use std::collections::HashMap;
30use uuid::Uuid;
31
32use crate::{
33    error::{ApiError, ApiResult},
34    middleware::{resolve_org_context, AuthUser},
35    models::CloudWorkspace,
36    AppState,
37};
38
39/// Hard cap on how many capture rows a single verification call will
40/// pull. Picked to be generous enough for typical staging windows while
41/// keeping a runaway request from materialising the entire workspace
42/// retention into memory.
43const MAX_CAPTURE_ROWS: i64 = 5000;
44
45/// Default lookback if the caller omits `since`. One hour matches what
46/// the cloud UI defaults to and stays well inside even the Free-tier
47/// 24h retention.
48const DEFAULT_LOOKBACK: Duration = Duration::hours(1);
49
50/// Hard ceiling on lookback regardless of caller input. Free retention
51/// is 24h, so anything beyond that would be empty for the worst-case
52/// tier anyway, and we don't want a misconfigured client to scan the
53/// full Pro/Team retention by accident.
54const MAX_LOOKBACK: Duration = Duration::hours(24);
55
56#[derive(Debug, Deserialize)]
57pub struct TimeWindow {
58    /// RFC3339 timestamp; only captures with `occurred_at >= since` are
59    /// considered. Defaults to `now() - 1h`.
60    #[serde(default)]
61    pub since: Option<DateTime<Utc>>,
62    /// RFC3339 timestamp; only captures with `occurred_at <= until` are
63    /// considered. Defaults to `now()`.
64    #[serde(default)]
65    pub until: Option<DateTime<Utc>>,
66}
67
68#[derive(Debug, Deserialize)]
69pub struct VerifyBody {
70    pub pattern: VerificationRequest,
71    pub expected: VerificationCount,
72    #[serde(flatten)]
73    pub window: TimeWindow,
74}
75
76#[derive(Debug, Deserialize)]
77pub struct CountBody {
78    pub pattern: VerificationRequest,
79    #[serde(flatten)]
80    pub window: TimeWindow,
81}
82
83#[derive(Debug, Serialize)]
84pub struct CountResponse {
85    pub count: usize,
86}
87
88#[derive(Debug, Deserialize)]
89pub struct SequenceBody {
90    pub patterns: Vec<VerificationRequest>,
91    #[serde(flatten)]
92    pub window: TimeWindow,
93}
94
95#[derive(Debug, Deserialize)]
96pub struct NeverBody {
97    pub pattern: VerificationRequest,
98    #[serde(flatten)]
99    pub window: TimeWindow,
100}
101
102#[derive(Debug, Deserialize)]
103pub struct AtLeastBody {
104    pub pattern: VerificationRequest,
105    pub min: usize,
106    #[serde(flatten)]
107    pub window: TimeWindow,
108}
109
110async fn require_workspace(
111    state: &AppState,
112    user_id: Uuid,
113    headers: &HeaderMap,
114    workspace_id: Uuid,
115) -> ApiResult<CloudWorkspace> {
116    let org_ctx = resolve_org_context(state, user_id, headers, None)
117        .await
118        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
119
120    let workspace = CloudWorkspace::find_by_id(state.db.pool(), workspace_id)
121        .await?
122        .ok_or_else(|| ApiError::InvalidRequest("Workspace not found".to_string()))?;
123
124    if workspace.org_id != org_ctx.org_id {
125        return Err(ApiError::InvalidRequest(
126            "Workspace does not belong to this organization".to_string(),
127        ));
128    }
129
130    Ok(workspace)
131}
132
133/// Resolve the time window, clamping to `MAX_LOOKBACK` and rejecting
134/// inverted ranges.
135fn resolve_window(window: &TimeWindow) -> ApiResult<(DateTime<Utc>, DateTime<Utc>)> {
136    let now = Utc::now();
137    let until = window.until.unwrap_or(now);
138    let since = window.since.unwrap_or(until - DEFAULT_LOOKBACK);
139
140    if since > until {
141        return Err(ApiError::InvalidRequest("`since` must be earlier than `until`".to_string()));
142    }
143
144    let max_since = until - MAX_LOOKBACK;
145    if since < max_since {
146        return Err(ApiError::InvalidRequest(format!(
147            "Window too large: max lookback is {} hours",
148            MAX_LOOKBACK.num_hours()
149        )));
150    }
151
152    Ok((since, until))
153}
154
155/// Pulled column subset from `runtime_captures`. Mirrors the shape we
156/// need to materialise a `RequestLogEntry` — anything outside this
157/// columns list (response_body, tags, etc.) is intentionally dropped
158/// because the matcher doesn't consult it.
159#[derive(sqlx::FromRow)]
160struct CaptureRow {
161    occurred_at: DateTime<Utc>,
162    method: String,
163    path: String,
164    query_params: Option<String>,
165    request_headers: String,
166    request_body: Option<String>,
167    duration_ms: Option<i64>,
168    status_code: Option<i32>,
169    client_ip: Option<String>,
170    response_size_bytes: Option<i64>,
171}
172
173async fn load_captures(
174    state: &AppState,
175    workspace_id: Uuid,
176    since: DateTime<Utc>,
177    until: DateTime<Utc>,
178) -> ApiResult<Vec<CaptureRow>> {
179    sqlx::query_as::<_, CaptureRow>(
180        r#"
181        SELECT occurred_at,
182               method,
183               path,
184               query_params,
185               request_headers,
186               request_body,
187               duration_ms,
188               status_code,
189               client_ip,
190               response_size_bytes
191          FROM runtime_captures
192         WHERE workspace_id = $1
193           AND occurred_at >= $2
194           AND occurred_at <= $3
195         ORDER BY occurred_at DESC
196         LIMIT $4
197        "#,
198    )
199    .bind(workspace_id)
200    .bind(since)
201    .bind(until)
202    .bind(MAX_CAPTURE_ROWS)
203    .fetch_all(state.db.pool())
204    .await
205    .map_err(ApiError::Database)
206}
207
208/// Convert a capture row into the in-memory shape the local matcher
209/// expects. The body (if any) is stashed in `metadata["request_body"]`
210/// because that's where `mockforge_core::verification::matches_body_pattern`
211/// already looks.
212fn row_to_entry(row: CaptureRow) -> RequestLogEntry {
213    let headers: HashMap<String, String> =
214        serde_json::from_str(&row.request_headers).unwrap_or_default();
215    let query_params: HashMap<String, String> = row
216        .query_params
217        .as_deref()
218        .and_then(|s| serde_json::from_str(s).ok())
219        .unwrap_or_default();
220
221    let mut metadata = HashMap::new();
222    if let Some(body) = row.request_body {
223        metadata.insert("request_body".to_string(), body);
224    }
225
226    RequestLogEntry {
227        id: format!("capture-{}", row.occurred_at.timestamp_nanos_opt().unwrap_or(0)),
228        timestamp: row.occurred_at,
229        server_type: "HTTP".to_string(),
230        method: row.method,
231        path: row.path,
232        status_code: row.status_code.unwrap_or(0).max(0) as u16,
233        response_time_ms: row.duration_ms.unwrap_or(0).max(0) as u64,
234        client_ip: row.client_ip,
235        user_agent: None,
236        headers,
237        query_params,
238        response_size_bytes: row.response_size_bytes.unwrap_or(0).max(0) as u64,
239        error_message: None,
240        metadata,
241        reality_metadata: None,
242    }
243}
244
245/// `POST /api/v1/workspaces/{workspace_id}/request-log/verify`
246pub async fn verify(
247    State(state): State<AppState>,
248    AuthUser(user_id): AuthUser,
249    headers: HeaderMap,
250    Path(workspace_id): Path<Uuid>,
251    Json(body): Json<VerifyBody>,
252) -> ApiResult<Json<VerificationResult>> {
253    require_workspace(&state, user_id, &headers, workspace_id).await?;
254    let (since, until) = resolve_window(&body.window)?;
255    let rows = load_captures(&state, workspace_id, since, until).await?;
256    let entries: Vec<RequestLogEntry> = rows.into_iter().map(row_to_entry).collect();
257    Ok(Json(verify_entries(&entries, &body.pattern, body.expected)))
258}
259
260/// `POST /api/v1/workspaces/{workspace_id}/request-log/count`
261pub async fn count(
262    State(state): State<AppState>,
263    AuthUser(user_id): AuthUser,
264    headers: HeaderMap,
265    Path(workspace_id): Path<Uuid>,
266    Json(body): Json<CountBody>,
267) -> ApiResult<Json<CountResponse>> {
268    require_workspace(&state, user_id, &headers, workspace_id).await?;
269    let (since, until) = resolve_window(&body.window)?;
270    let rows = load_captures(&state, workspace_id, since, until).await?;
271    let entries: Vec<RequestLogEntry> = rows.into_iter().map(row_to_entry).collect();
272    let result = verify_entries(&entries, &body.pattern, VerificationCount::AtLeast(0));
273    Ok(Json(CountResponse {
274        count: result.count,
275    }))
276}
277
278/// `POST /api/v1/workspaces/{workspace_id}/request-log/sequence`
279pub async fn sequence(
280    State(state): State<AppState>,
281    AuthUser(user_id): AuthUser,
282    headers: HeaderMap,
283    Path(workspace_id): Path<Uuid>,
284    Json(body): Json<SequenceBody>,
285) -> ApiResult<Json<VerificationResult>> {
286    require_workspace(&state, user_id, &headers, workspace_id).await?;
287    let (since, until) = resolve_window(&body.window)?;
288    let rows = load_captures(&state, workspace_id, since, until).await?;
289    // Captures come back DESC; sequence verification expects chronological order.
290    let entries: Vec<RequestLogEntry> = rows.into_iter().rev().map(row_to_entry).collect();
291    Ok(Json(verify_sequence_entries(&entries, &body.patterns)))
292}
293
294/// `POST /api/v1/workspaces/{workspace_id}/request-log/never`
295pub async fn never(
296    State(state): State<AppState>,
297    AuthUser(user_id): AuthUser,
298    headers: HeaderMap,
299    Path(workspace_id): Path<Uuid>,
300    Json(body): Json<NeverBody>,
301) -> ApiResult<Json<VerificationResult>> {
302    require_workspace(&state, user_id, &headers, workspace_id).await?;
303    let (since, until) = resolve_window(&body.window)?;
304    let rows = load_captures(&state, workspace_id, since, until).await?;
305    let entries: Vec<RequestLogEntry> = rows.into_iter().map(row_to_entry).collect();
306    Ok(Json(verify_entries(&entries, &body.pattern, VerificationCount::Never)))
307}
308
309/// `POST /api/v1/workspaces/{workspace_id}/request-log/at-least`
310pub async fn at_least(
311    State(state): State<AppState>,
312    AuthUser(user_id): AuthUser,
313    headers: HeaderMap,
314    Path(workspace_id): Path<Uuid>,
315    Json(body): Json<AtLeastBody>,
316) -> ApiResult<Json<VerificationResult>> {
317    require_workspace(&state, user_id, &headers, workspace_id).await?;
318    let (since, until) = resolve_window(&body.window)?;
319    let rows = load_captures(&state, workspace_id, since, until).await?;
320    let entries: Vec<RequestLogEntry> = rows.into_iter().map(row_to_entry).collect();
321    Ok(Json(verify_entries(
322        &entries,
323        &body.pattern,
324        VerificationCount::AtLeast(body.min),
325    )))
326}
327
328#[derive(Debug, Serialize)]
329pub struct WorkspaceCaptureStatus {
330    /// Whether at least one deployment in the workspace currently has
331    /// captured rows. The UI uses this to surface the "enable recording"
332    /// hint when the verification feature would silently return zeros.
333    pub has_captures: bool,
334    /// Total capture rows in the workspace within the default lookback
335    /// window. Useful as a sanity check in the UI.
336    pub recent_capture_count: i64,
337}
338
339/// `GET /api/v1/workspaces/{workspace_id}/request-log/status`
340///
341/// Lightweight surface used by the cloud Verification page to decide
342/// whether to show the "no recordings yet" empty state.
343pub async fn status(
344    State(state): State<AppState>,
345    AuthUser(user_id): AuthUser,
346    headers: HeaderMap,
347    Path(workspace_id): Path<Uuid>,
348) -> ApiResult<Json<WorkspaceCaptureStatus>> {
349    require_workspace(&state, user_id, &headers, workspace_id).await?;
350
351    let recent_capture_count: i64 = sqlx::query_scalar(
352        r#"
353        SELECT COUNT(*)
354          FROM runtime_captures
355         WHERE workspace_id = $1
356           AND occurred_at >= NOW() - INTERVAL '1 hour'
357        "#,
358    )
359    .bind(workspace_id)
360    .fetch_one(state.db.pool())
361    .await
362    .map_err(ApiError::Database)?;
363
364    Ok(Json(WorkspaceCaptureStatus {
365        has_captures: recent_capture_count > 0,
366        recent_capture_count,
367    }))
368}
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373
374    #[test]
375    fn resolve_window_uses_defaults() {
376        let w = TimeWindow {
377            since: None,
378            until: None,
379        };
380        let (since, until) = resolve_window(&w).unwrap();
381        let span = until - since;
382        // Should default to roughly DEFAULT_LOOKBACK (allow a few seconds of jitter).
383        let drift = (span - DEFAULT_LOOKBACK).num_seconds().abs();
384        assert!(drift < 5, "expected ~1h window, got {}s", span.num_seconds());
385    }
386
387    #[test]
388    fn resolve_window_rejects_inverted_range() {
389        let now = Utc::now();
390        let w = TimeWindow {
391            since: Some(now),
392            until: Some(now - Duration::minutes(5)),
393        };
394        assert!(resolve_window(&w).is_err());
395    }
396
397    #[test]
398    fn resolve_window_rejects_too_large() {
399        let now = Utc::now();
400        let w = TimeWindow {
401            since: Some(now - Duration::hours(48)),
402            until: Some(now),
403        };
404        assert!(resolve_window(&w).is_err());
405    }
406
407    #[test]
408    fn row_to_entry_extracts_headers_and_body() {
409        let row = CaptureRow {
410            occurred_at: Utc::now(),
411            method: "POST".to_string(),
412            path: "/api/checkout".to_string(),
413            query_params: Some(r#"{"ref":"abc"}"#.to_string()),
414            request_headers: r#"{"content-type":"application/json"}"#.to_string(),
415            request_body: Some(r#"{"item":"widget"}"#.to_string()),
416            duration_ms: Some(42),
417            status_code: Some(201),
418            client_ip: Some("10.0.0.1".to_string()),
419            response_size_bytes: Some(128),
420        };
421        let entry = row_to_entry(row);
422        assert_eq!(entry.method, "POST");
423        assert_eq!(entry.headers.get("content-type").map(String::as_str), Some("application/json"));
424        assert_eq!(entry.query_params.get("ref").map(String::as_str), Some("abc"));
425        assert_eq!(
426            entry.metadata.get("request_body").map(String::as_str),
427            Some(r#"{"item":"widget"}"#)
428        );
429        assert_eq!(entry.status_code, 201);
430        assert_eq!(entry.response_time_ms, 42);
431    }
432
433    #[test]
434    fn row_to_entry_handles_invalid_header_json() {
435        let row = CaptureRow {
436            occurred_at: Utc::now(),
437            method: "GET".to_string(),
438            path: "/".to_string(),
439            query_params: None,
440            request_headers: "not valid json".to_string(),
441            request_body: None,
442            duration_ms: None,
443            status_code: None,
444            client_ip: None,
445            response_size_bytes: None,
446        };
447        let entry = row_to_entry(row);
448        assert!(entry.headers.is_empty());
449        assert!(entry.query_params.is_empty());
450        assert!(!entry.metadata.contains_key("request_body"));
451    }
452}