use axum::{
extract::{Path, State},
http::HeaderMap,
Json,
};
use serde::Deserialize;
use uuid::Uuid;
use crate::{
error::{ApiError, ApiResult},
models::{FitnessFunction, TestRun, UsageCounter},
AppState,
};
use axum::extract::Query;
use serde::Serialize;
fn require_internal_auth(headers: &HeaderMap) -> ApiResult<()> {
let configured = match std::env::var("MOCKFORGE_INTERNAL_API_TOKEN") {
Ok(v) if !v.is_empty() => v,
_ => {
return Err(ApiError::Internal(anyhow::anyhow!(
"MOCKFORGE_INTERNAL_API_TOKEN not configured"
)));
}
};
let provided = headers
.get(axum::http::header::AUTHORIZATION)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.strip_prefix("Bearer "))
.ok_or_else(|| ApiError::InvalidRequest("Not found".into()))?;
if !constant_time_eq(provided.as_bytes(), configured.as_bytes()) {
return Err(ApiError::InvalidRequest("Not found".into()));
}
Ok(())
}
fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
if a.len() != b.len() {
return false;
}
let mut diff = 0u8;
for (x, y) in a.iter().zip(b.iter()) {
diff |= x ^ y;
}
diff == 0
}
pub async fn run_started(
State(state): State<AppState>,
Path(id): Path<Uuid>,
headers: HeaderMap,
) -> ApiResult<Json<serde_json::Value>> {
require_internal_auth(&headers)?;
let updated = TestRun::mark_running(state.db.pool(), id).await.map_err(ApiError::Database)?;
if updated.is_none() {
return Ok(Json(serde_json::json!({ "status": "noop" })));
}
Ok(Json(serde_json::json!({ "status": "running" })))
}
#[derive(Debug, Deserialize)]
pub struct EventBody {
pub seq: i32,
pub event_type: String,
pub payload: serde_json::Value,
}
pub async fn run_event(
State(state): State<AppState>,
Path(id): Path<Uuid>,
headers: HeaderMap,
Json(body): Json<EventBody>,
) -> ApiResult<Json<serde_json::Value>> {
require_internal_auth(&headers)?;
sqlx::query(
"INSERT INTO test_run_events (run_id, seq, event_type, payload) \
VALUES ($1, $2, $3, $4) \
ON CONFLICT (run_id, seq) DO NOTHING",
)
.bind(id)
.bind(body.seq)
.bind(&body.event_type)
.bind(&body.payload)
.execute(state.db.pool())
.await
.map_err(ApiError::Database)?;
Ok(Json(serde_json::json!({ "appended": true })))
}
#[derive(Debug, Deserialize)]
pub struct FinishBody {
pub status: String,
pub runner_seconds: i32,
#[serde(default)]
pub summary: Option<serde_json::Value>,
}
pub async fn run_finished(
State(state): State<AppState>,
Path(id): Path<Uuid>,
headers: HeaderMap,
Json(body): Json<FinishBody>,
) -> ApiResult<Json<serde_json::Value>> {
require_internal_auth(&headers)?;
if !matches!(body.status.as_str(), "passed" | "failed" | "cancelled" | "errored") {
return Err(ApiError::InvalidRequest(
"status must be passed | failed | cancelled | errored".into(),
));
}
if body.runner_seconds < 0 {
return Err(ApiError::InvalidRequest("runner_seconds must be non-negative".into()));
}
let run = TestRun::mark_finished(
state.db.pool(),
id,
&body.status,
body.runner_seconds,
body.summary.as_ref(),
)
.await
.map_err(ApiError::Database)?;
if let Some(run) = run {
if body.runner_seconds > 0 {
UsageCounter::increment_runner_seconds(
state.db.pool(),
run.org_id,
body.runner_seconds as i64,
)
.await
.map_err(ApiError::Database)?;
}
if let Err(e) = mirror_kind_status(&state, &run, body.summary.as_ref()).await {
tracing::error!(
run_id = %run.id,
kind = %run.kind,
error = %e,
"failed to mirror run status onto owning resource — UI may show stale status",
);
}
return Ok(Json(serde_json::json!({
"status": run.status,
"runner_seconds": run.runner_seconds,
})));
}
Ok(Json(serde_json::json!({ "status": "noop" })))
}
async fn mirror_kind_status(
state: &AppState,
run: &TestRun,
summary: Option<&serde_json::Value>,
) -> sqlx::Result<()> {
use mockforge_registry_core::models::chaos::CreateChaosCampaignReport;
use mockforge_registry_core::models::incident::RaiseIncidentInput;
use mockforge_registry_core::models::{ChaosCampaignReport, CloneModel, Incident, Snapshot};
let pool = state.db.pool();
match run.kind.as_str() {
"snapshot_capture" => {
if run.status == "passed" {
let storage_url = summary
.and_then(|s| s.get("storage_url"))
.and_then(|v| v.as_str())
.unwrap_or("synthetic://snapshot");
let size_bytes =
summary.and_then(|s| s.get("size_bytes")).and_then(|v| v.as_i64()).unwrap_or(0);
let manifest = summary.cloned().unwrap_or_else(|| serde_json::json!({}));
Snapshot::mark_ready(pool, run.suite_id, storage_url, size_bytes, &manifest)
.await?;
} else {
Snapshot::mark_failed(pool, run.suite_id).await?;
}
}
"behavioral_clone" => {
if run.status == "passed" {
let artifact_url = summary
.and_then(|s| s.get("artifact_url"))
.and_then(|v| v.as_str())
.unwrap_or("synthetic://clone-model");
let metrics = summary.cloned().unwrap_or_else(|| serde_json::json!({}));
CloneModel::mark_ready(
pool,
run.suite_id,
artifact_url,
&metrics,
run.runner_seconds.unwrap_or(0),
)
.await?;
} else {
CloneModel::mark_failed(pool, run.suite_id).await?;
}
}
"chaos_campaign" => {
let aborted = !matches!(run.status.as_str(), "passed");
let abort_reason: Option<String> = if aborted {
summary
.and_then(|s| s.get("abort_reason"))
.and_then(|v| v.as_str())
.map(str::to_string)
.or_else(|| Some(run.status.clone()))
} else {
None
};
let fault_count = summary
.and_then(|s| s.get("fault_count"))
.and_then(|v| v.as_i64())
.unwrap_or(0)
.clamp(0, i32::MAX as i64) as i32;
let recommendations = summary.and_then(|s| s.get("recommendations").cloned());
ChaosCampaignReport::create(
pool,
CreateChaosCampaignReport {
campaign_id: run.suite_id,
run_id: run.id,
fault_count,
aborted,
abort_reason: abort_reason.as_deref(),
summary,
recommendations: recommendations.as_ref(),
},
)
.await?;
}
"smoke" => {
if !matches!(run.status.as_str(), "passed" | "cancelled") {
let failed_count =
summary.and_then(|s| s.get("failed")).and_then(|v| v.as_i64()).unwrap_or(0);
let total_routes = summary
.and_then(|s| s.get("total_routes"))
.and_then(|v| v.as_i64())
.unwrap_or(0);
let base_url =
summary.and_then(|s| s.get("base_url")).and_then(|v| v.as_str()).unwrap_or("");
let severity = if run.status == "errored" {
"medium"
} else {
"high"
};
let title = if run.status == "errored" {
"Smoke test errored before probing routes".to_string()
} else if total_routes > 0 {
format!("{}/{} routes failed smoke test", failed_count.max(1), total_routes)
} else {
"Smoke test failed".to_string()
};
let dedupe_key = run.suite_id.to_string();
let source_ref = run.id.to_string();
let description = serde_json::json!({
"deployment_id": run.suite_id,
"run_id": run.id,
"run_status": run.status,
"base_url": base_url,
"total_routes": total_routes,
"failed": failed_count,
})
.to_string();
Incident::raise(
pool,
RaiseIncidentInput {
org_id: run.org_id,
workspace_id: None,
source: "hosted_mock_smoke",
source_ref: Some(&source_ref),
dedupe_key: &dedupe_key,
severity,
title: &title,
description: Some(&description),
},
)
.await?;
}
}
"fitness_evaluation" => {
let measured_value =
summary.and_then(|s| s.get("measured_value")).and_then(|v| v.as_f64());
let threshold_value =
summary.and_then(|s| s.get("threshold_value")).and_then(|v| v.as_f64());
let eval_status = match run.status.as_str() {
"passed" => "pass",
"failed" => "fail",
_ => "unknown",
};
if let Err(e) = FitnessFunction::record_evaluation(
pool,
run.suite_id,
eval_status,
measured_value,
threshold_value,
)
.await
{
tracing::warn!(
run_id = %run.id,
function_id = %run.suite_id,
error = %e,
"failed to record fitness evaluation history",
);
}
if !matches!(run.status.as_str(), "passed" | "cancelled") {
let breaking_count = summary
.and_then(|s| s.get("breaking_count"))
.and_then(|v| v.as_i64())
.unwrap_or(0);
let findings_count = summary
.and_then(|s| s.get("findings_count"))
.and_then(|v| v.as_i64())
.unwrap_or(0);
let function_name = summary
.and_then(|s| s.get("function_name"))
.and_then(|v| v.as_str())
.unwrap_or("");
let severity = if run.status == "errored" {
"medium"
} else {
"high"
};
let title = if run.status == "errored" {
"Fitness function evaluation errored".to_string()
} else if !function_name.is_empty() {
format!("Fitness function '{}' failed", function_name)
} else if breaking_count > 0 {
format!(
"{} breaking fitness violation{}",
breaking_count,
if breaking_count == 1 { "" } else { "s" },
)
} else {
"Fitness function failed".to_string()
};
let dedupe_key = run.suite_id.to_string();
let source_ref = run.id.to_string();
let description = serde_json::json!({
"fitness_function_id": run.suite_id,
"run_id": run.id,
"run_status": run.status,
"function_name": function_name,
"measured_value": measured_value,
"threshold_value": threshold_value,
"breaking_count": breaking_count,
"findings_count": findings_count,
})
.to_string();
if let Err(e) = Incident::raise(
pool,
RaiseIncidentInput {
org_id: run.org_id,
workspace_id: None,
source: "fitness_function",
source_ref: Some(&source_ref),
dedupe_key: &dedupe_key,
severity,
title: &title,
description: Some(&description),
},
)
.await
{
tracing::warn!(
run_id = %run.id,
function_id = %run.suite_id,
error = %e,
"failed to raise fitness-function incident",
);
}
}
}
_ => {}
}
Ok(())
}
#[allow(missing_docs)] #[derive(Debug, serde::Serialize, sqlx::FromRow)]
pub struct CaptureExchangeRow {
pub capture_id: String,
pub method: String,
pub path: String,
pub query_params: Option<String>,
pub request_headers: String,
pub request_body: Option<String>,
pub request_body_encoding: String,
pub response_status_code: Option<i32>,
pub response_headers: Option<String>,
pub response_body: Option<String>,
pub response_body_encoding: Option<String>,
pub duration_ms: Option<i64>,
pub occurred_at: chrono::DateTime<chrono::Utc>,
}
pub async fn get_tunnel_reservation_by_subdomain(
State(state): State<AppState>,
Path(subdomain): Path<String>,
headers: HeaderMap,
) -> ApiResult<Json<serde_json::Value>> {
use mockforge_registry_core::models::TunnelReservation;
require_internal_auth(&headers)?;
let row = TunnelReservation::find_by_subdomain(state.db.pool(), &subdomain)
.await
.map_err(ApiError::Database)?
.ok_or_else(|| ApiError::InvalidRequest("Subdomain not reserved".into()))?;
Ok(Json(serde_json::json!({
"id": row.id,
"org_id": row.org_id,
"name": row.name,
"subdomain": row.subdomain,
"custom_domain": row.custom_domain,
"custom_domain_verified": row.custom_domain_verified,
"status": row.status,
})))
}
#[derive(Debug, Deserialize)]
pub struct ChaosToggleRequest {
pub enabled: bool,
}
pub async fn proxy_chaos_toggle(
State(state): State<AppState>,
Path(deployment_id): Path<Uuid>,
headers: HeaderMap,
Json(body): Json<ChaosToggleRequest>,
) -> ApiResult<Json<serde_json::Value>> {
use mockforge_registry_core::models::HostedMock;
require_internal_auth(&headers)?;
let deployment = HostedMock::find_by_id(state.db.pool(), deployment_id)
.await
.map_err(ApiError::Database)?
.ok_or_else(|| ApiError::InvalidRequest("Deployment not found".into()))?;
let base = deployment
.internal_url
.as_deref()
.or(deployment.deployment_url.as_deref())
.ok_or_else(|| {
ApiError::InvalidRequest(
"Deployment has neither internal_url nor deployment_url".into(),
)
})?;
let target = format!(
"{}/__mockforge/chaos/toggle",
base.trim_end_matches('/').trim_end_matches("/__mockforge")
);
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.user_agent("mockforge-registry-chaos-proxy/1.0")
.build()
.unwrap_or_else(|_| reqwest::Client::new());
let resp = client
.post(&target)
.json(&serde_json::json!({ "enabled": body.enabled }))
.send()
.await
.map_err(|e| ApiError::InvalidRequest(format!("chaos proxy fetch failed: {e}")))?;
let status = resp.status();
let payload: serde_json::Value = resp.json().await.unwrap_or_else(|_| serde_json::json!({}));
if !status.is_success() {
return Err(ApiError::InvalidRequest(format!(
"deployment refused chaos toggle: HTTP {status}"
)));
}
Ok(Json(serde_json::json!({
"enabled": body.enabled,
"deployment_response": payload,
})))
}
#[allow(missing_docs)]
#[derive(Debug, serde::Serialize, sqlx::FromRow)]
pub struct WorkspaceEndpointHit {
pub method: String,
pub path: String,
pub hits: i64,
}
pub async fn get_workspace_endpoint_hits(
State(state): State<AppState>,
Path(workspace_id): Path<Uuid>,
headers: HeaderMap,
) -> ApiResult<Json<Vec<WorkspaceEndpointHit>>> {
require_internal_auth(&headers)?;
let rows = sqlx::query_as::<_, WorkspaceEndpointHit>(
r#"
SELECT rc.method,
rc.path,
COUNT(*) AS hits
FROM runtime_captures rc
WHERE rc.workspace_id = $1
AND rc.occurred_at >= NOW() - INTERVAL '24 hours'
GROUP BY rc.method, rc.path
ORDER BY hits DESC
LIMIT 500
"#,
)
.bind(workspace_id)
.fetch_all(state.db.pool())
.await
.map_err(ApiError::Database)?;
Ok(Json(rows))
}
pub async fn get_capture_exchanges(
State(state): State<AppState>,
Path(session_id): Path<Uuid>,
headers: HeaderMap,
) -> ApiResult<Json<Vec<CaptureExchangeRow>>> {
require_internal_auth(&headers)?;
let rows = sqlx::query_as::<_, CaptureExchangeRow>(
r#"
SELECT rc.capture_id,
rc.method,
rc.path,
rc.query_params,
rc.request_headers,
rc.request_body,
rc.request_body_encoding,
rc.response_status_code,
rc.response_headers,
rc.response_body,
rc.response_body_encoding,
rc.duration_ms,
rc.occurred_at
FROM runtime_captures rc
JOIN capture_session_members csm
ON csm.capture_id = rc.capture_id::uuid
WHERE csm.session_id = $1
ORDER BY rc.occurred_at ASC
LIMIT 1000
"#,
)
.bind(session_id)
.fetch_all(state.db.pool())
.await
.map_err(ApiError::Database)?;
Ok(Json(rows))
}
pub async fn get_fitness_function(
State(state): State<AppState>,
Path(id): Path<Uuid>,
headers: HeaderMap,
) -> ApiResult<Json<FitnessFunction>> {
require_internal_auth(&headers)?;
let row = FitnessFunction::find_by_id(state.db.pool(), id)
.await
.map_err(ApiError::Database)?
.ok_or_else(|| ApiError::InvalidRequest("Fitness function not found".into()))?;
Ok(Json(row))
}
#[allow(missing_docs)]
#[derive(Debug, Clone, Serialize)]
pub struct DeploymentLatencyStats {
pub count: i64,
pub error_count: i64,
pub p50_ms: Option<f64>,
pub p95_ms: Option<f64>,
pub p99_ms: Option<f64>,
pub max_ms: Option<f64>,
pub avg_ms: Option<f64>,
}
#[derive(Debug, Deserialize)]
pub struct LatencyStatsQuery {
#[serde(default)]
pub window_minutes: Option<i64>,
#[serde(default)]
pub path: Option<String>,
}
pub async fn get_deployment_latency_stats(
State(state): State<AppState>,
Path(deployment_id): Path<Uuid>,
Query(params): Query<LatencyStatsQuery>,
headers: HeaderMap,
) -> ApiResult<Json<DeploymentLatencyStats>> {
require_internal_auth(&headers)?;
let window = params.window_minutes.unwrap_or(60).clamp(1, 1440);
let path_filter = params.path.as_deref().filter(|s| !s.is_empty() && s.len() <= 256);
let stats = sqlx::query_as::<_, DeploymentLatencyStatsRow>(
r#"
SELECT
COUNT(*)::bigint AS count,
COUNT(*) FILTER (WHERE status >= 500)::bigint AS error_count,
(percentile_cont(0.50) WITHIN GROUP (ORDER BY latency_ms))::float8 AS p50_ms,
(percentile_cont(0.95) WITHIN GROUP (ORDER BY latency_ms))::float8 AS p95_ms,
(percentile_cont(0.99) WITHIN GROUP (ORDER BY latency_ms))::float8 AS p99_ms,
MAX(latency_ms)::float8 AS max_ms,
AVG(latency_ms)::float8 AS avg_ms
FROM runtime_request_logs
WHERE deployment_id = $1
AND occurred_at >= NOW() - ($2::bigint * INTERVAL '1 minute')
AND ($3::text IS NULL OR path = $3)
"#,
)
.bind(deployment_id)
.bind(window)
.bind(path_filter)
.fetch_one(state.db.pool())
.await
.map_err(ApiError::Database)?;
Ok(Json(DeploymentLatencyStats {
count: stats.count,
error_count: stats.error_count,
p50_ms: stats.p50_ms,
p95_ms: stats.p95_ms,
p99_ms: stats.p99_ms,
max_ms: stats.max_ms,
avg_ms: stats.avg_ms,
}))
}
#[derive(Debug, sqlx::FromRow)]
struct DeploymentLatencyStatsRow {
count: i64,
error_count: i64,
p50_ms: Option<f64>,
p95_ms: Option<f64>,
p99_ms: Option<f64>,
max_ms: Option<f64>,
avg_ms: Option<f64>,
}
#[allow(missing_docs)]
#[derive(Debug, Clone, Serialize)]
pub struct MonitoredServiceContractStability {
pub breaking_count: i64,
pub non_breaking_count: i64,
pub cosmetic_count: i64,
pub run_count: i64,
pub latest_run_at: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct ContractStabilityQuery {
#[serde(default)]
pub window_minutes: Option<i64>,
}
pub async fn get_monitored_service_contract_stability(
State(state): State<AppState>,
Path(monitored_service_id): Path<Uuid>,
Query(params): Query<ContractStabilityQuery>,
headers: HeaderMap,
) -> ApiResult<Json<MonitoredServiceContractStability>> {
require_internal_auth(&headers)?;
let window = params.window_minutes.unwrap_or(1_440).clamp(1, 10_080);
let row = sqlx::query_as::<_, ContractStabilityRow>(
r#"
SELECT
COUNT(*) FILTER (WHERE f.severity = 'breaking')::bigint AS breaking_count,
COUNT(*) FILTER (WHERE f.severity = 'non_breaking')::bigint AS non_breaking_count,
COUNT(*) FILTER (WHERE f.severity = 'cosmetic')::bigint AS cosmetic_count,
(SELECT COUNT(*) FROM contract_diff_runs
WHERE monitored_service_id = $1
AND started_at >= NOW() - ($2::bigint * INTERVAL '1 minute'))::bigint
AS run_count,
(SELECT MAX(started_at) FROM contract_diff_runs
WHERE monitored_service_id = $1
AND started_at >= NOW() - ($2::bigint * INTERVAL '1 minute'))
AS latest_run_at
FROM contract_diff_findings f
JOIN contract_diff_runs r ON f.run_id = r.id
WHERE r.monitored_service_id = $1
AND r.started_at >= NOW() - ($2::bigint * INTERVAL '1 minute')
"#,
)
.bind(monitored_service_id)
.bind(window)
.fetch_one(state.db.pool())
.await
.map_err(ApiError::Database)?;
Ok(Json(MonitoredServiceContractStability {
breaking_count: row.breaking_count,
non_breaking_count: row.non_breaking_count,
cosmetic_count: row.cosmetic_count,
run_count: row.run_count,
latest_run_at: row.latest_run_at.map(|t| t.to_rfc3339()),
}))
}
#[derive(Debug, sqlx::FromRow)]
struct ContractStabilityRow {
breaking_count: i64,
non_breaking_count: i64,
cosmetic_count: i64,
run_count: i64,
latest_run_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ct_eq_returns_true_for_equal() {
assert!(constant_time_eq(b"abc", b"abc"));
assert!(constant_time_eq(b"", b""));
}
#[test]
fn ct_eq_returns_false_for_different() {
assert!(!constant_time_eq(b"abc", b"abd"));
assert!(!constant_time_eq(b"abc", b"ab"));
assert!(!constant_time_eq(b"abc", b"abcd"));
}
}