Skip to main content

mockforge_registry_server/handlers/
hosted_mocks.rs

1//! Hosted Mocks deployment handlers
2//!
3//! Provides endpoints for deploying, managing, and monitoring cloud-hosted mock services
4
5use axum::{
6    extract::{Multipart, Path, Query, State},
7    http::HeaderMap,
8    response::sse::{Event, KeepAlive, Sse},
9    Json,
10};
11use chrono::{DateTime, Utc};
12use futures_util::stream::Stream;
13use serde::{Deserialize, Serialize};
14use std::convert::Infallible;
15use uuid::Uuid;
16
17use crate::{
18    deployment::flyio::FlyioClient,
19    error::{ApiError, ApiResult},
20    fly_logs::LogEntry,
21    middleware::{
22        permission_check::PermissionChecker, permissions::Permission, resolve_org_context, AuthUser,
23    },
24    models::{
25        feature_usage::FeatureType, AuditEventType, DeploymentLog, DeploymentMetrics,
26        DeploymentStatus, HostedMock, Subscription, SubscriptionStatus, TestRun,
27    },
28    AppState,
29};
30use mockforge_registry_core::models::test_run::EnqueueTestRun;
31use tracing::warn;
32
33/// Create a new hosted mock deployment
34pub async fn create_deployment(
35    State(state): State<AppState>,
36    AuthUser(user_id): AuthUser,
37    headers: HeaderMap,
38    Json(request): Json<CreateDeploymentRequest>,
39) -> ApiResult<Json<DeploymentResponse>> {
40    let pool = state.db.pool();
41
42    // Resolve org context
43    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
44        .await
45        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
46
47    // Check permission
48    let checker = PermissionChecker::new(&state);
49    checker
50        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
51        .await?;
52
53    // Block deployment for past_due subscriptions whose payment has been
54    // outstanding for >24h — per #449 acceptance criterion. The earlier
55    // implementation (PR #479) blocked immediately on the first
56    // `invoice.payment_failed` webhook, which was stricter than the spec
57    // called for and made customers feel cut off the moment Stripe's first
58    // retry failed (often a transient card-network blip). The 24h grace
59    // window matches what most SaaS providers do during Stripe's
60    // smart-retry sequence.
61    //
62    // We use `subscription.updated_at` as a proxy for "when the status
63    // flipped to past_due" because every state-change webhook
64    // (`customer.subscription.updated`, `invoice.payment_failed`,
65    // `invoice.payment_succeeded`) calls `Subscription::upsert_from_stripe`
66    // which touches `updated_at`. There's a small accuracy gap if an
67    // unrelated write happens during the grace window, but for the 24h
68    // threshold it's good enough; a future PR can add an explicit
69    // `past_due_since` column if billing wants finer control.
70    if let Some(subscription) = Subscription::find_by_org(pool, org_ctx.org_id).await? {
71        if subscription.status() == SubscriptionStatus::PastDue {
72            const PAST_DUE_GRACE_SECONDS: i64 = 24 * 60 * 60;
73            let elapsed = (chrono::Utc::now() - subscription.updated_at).num_seconds();
74            if elapsed > PAST_DUE_GRACE_SECONDS {
75                return Err(ApiError::InvalidRequest(
76                    "Subscription has been past due for over 24 hours. Please update your payment method in the billing portal before deploying new mocks.".to_string(),
77                ));
78            }
79            tracing::info!(
80                org_id = %org_ctx.org_id,
81                past_due_seconds = elapsed,
82                "past_due subscription within 24h grace; allowing deploy"
83            );
84        }
85    }
86
87    // Check plan limits
88    let limits = &org_ctx.org.limits_json;
89    let max_hosted_mocks = limits.get("max_hosted_mocks").and_then(|v| v.as_i64()).unwrap_or(0);
90
91    if max_hosted_mocks >= 0 {
92        // Count existing active deployments
93        let existing = state.store.list_hosted_mocks_by_org(org_ctx.org_id).await?;
94
95        let active_count = existing
96            .iter()
97            .filter(|m| {
98                matches!(m.status(), DeploymentStatus::Active | DeploymentStatus::Deploying)
99            })
100            .count();
101
102        if active_count as i64 >= max_hosted_mocks {
103            return Err(ApiError::InvalidRequest(format!(
104                "Hosted mocks limit exceeded. Your plan allows {} active deployments. Upgrade to deploy more.",
105                max_hosted_mocks
106            )));
107        }
108    }
109
110    // Validate slug format
111    let generated_slug;
112    let slug = match request.slug.as_deref() {
113        Some(s) => s,
114        None => {
115            generated_slug = request
116                .name
117                .to_lowercase()
118                .chars()
119                .map(|c| {
120                    if c.is_alphanumeric() || c == '-' {
121                        c
122                    } else {
123                        '-'
124                    }
125                })
126                .collect::<String>()
127                .trim_matches('-')
128                .replace("--", "-");
129            &generated_slug
130        }
131    };
132
133    if !slug.chars().all(|c| c.is_alphanumeric() || c == '-') {
134        return Err(ApiError::InvalidRequest(
135            "Slug must contain only alphanumeric characters and hyphens".to_string(),
136        ));
137    }
138
139    // Check if slug is already taken
140    if state.store.find_hosted_mock_by_slug(org_ctx.org_id, slug).await?.is_some() {
141        return Err(ApiError::InvalidRequest(format!(
142            "A deployment with slug '{}' already exists",
143            slug
144        )));
145    }
146
147    // Resolve and plan-gate the protocol set. HTTP is always present and is
148    // free; gRPC requires Pro; brokers (SMTP/MQTT/Kafka/AMQP/TCP) require
149    // Team. Reject with an explicit error so the UI can surface "upgrade to
150    // enable X" rather than a generic 400.
151    let mut enabled_protocols = request.enabled_protocols.clone().unwrap_or_default();
152    if !enabled_protocols.contains(&crate::models::Protocol::Http) {
153        enabled_protocols.insert(0, crate::models::Protocol::Http);
154    }
155    let plan = org_ctx.org.plan.clone();
156    if !crate::models::protocols_allowed_on_plan(&enabled_protocols, &plan) {
157        let blocked: Vec<String> = enabled_protocols
158            .iter()
159            .filter(|p| !crate::models::protocols_allowed_on_plan(&[**p], &plan))
160            .map(|p| format!("{:?}", p))
161            .collect();
162        return Err(ApiError::InvalidRequest(format!(
163            "These protocols require a higher plan than '{}': {}",
164            plan,
165            blocked.join(", ")
166        )));
167    }
168
169    // Persist enabled protocols and upstream_url into the deployment's
170    // config_json so the orchestrator can read them when building the
171    // Fly machine. Additive merge preserves any keys the caller supplied.
172    let mut config_json = request.config_json.clone();
173    if !config_json.is_object() {
174        config_json = serde_json::json!({});
175    }
176    if let Some(obj) = config_json.as_object_mut() {
177        obj.insert(
178            "enabled_protocols".to_string(),
179            serde_json::to_value(&enabled_protocols).unwrap_or(serde_json::Value::Null),
180        );
181        if let Some(upstream) = request.upstream_url.as_ref() {
182            let trimmed = upstream.trim();
183            if !trimmed.is_empty() {
184                obj.insert(
185                    "upstream_url".to_string(),
186                    serde_json::Value::String(trimmed.to_string()),
187                );
188            }
189        }
190    }
191
192    // Create deployment record
193    let deployment = state
194        .store
195        .create_hosted_mock(
196            org_ctx.org_id,
197            request.project_id,
198            &request.name,
199            slug,
200            request.description.as_deref(),
201            config_json,
202            request.openapi_spec_url.as_deref(),
203            request.region.as_deref(),
204        )
205        .await?;
206
207    // Log deployment creation
208    DeploymentLog::create(
209        pool,
210        deployment.id,
211        "info",
212        "Deployment created",
213        Some(serde_json::json!({
214            "name": request.name,
215            "slug": slug,
216        })),
217    )
218    .await
219    .map_err(ApiError::Database)?;
220
221    // Mark as pending - the deployment orchestrator will pick it up and deploy it
222    // The orchestrator polls for pending/deploying deployments every 10 seconds
223    state
224        .store
225        .update_hosted_mock_status(deployment.id, DeploymentStatus::Pending, None)
226        .await?;
227
228    // Track feature usage
229    state
230        .store
231        .record_feature_usage(
232            org_ctx.org_id,
233            Some(user_id),
234            FeatureType::HostedMockDeploy,
235            Some(serde_json::json!({
236                "deployment_id": deployment.id,
237                "name": request.name,
238                "slug": slug,
239            })),
240        )
241        .await;
242
243    // Record audit log
244    let ip_address = headers
245        .get("X-Forwarded-For")
246        .or_else(|| headers.get("X-Real-IP"))
247        .and_then(|h| h.to_str().ok())
248        .map(|s| s.split(',').next().unwrap_or(s).trim());
249    let user_agent = headers.get("User-Agent").and_then(|h| h.to_str().ok());
250
251    state
252        .store
253        .record_audit_event(
254            org_ctx.org_id,
255            Some(user_id),
256            AuditEventType::DeploymentCreated,
257            format!("Hosted mock deployment '{}' created", request.name),
258            Some(serde_json::json!({
259                "deployment_id": deployment.id,
260                "name": request.name,
261                "slug": slug,
262                "project_id": request.project_id,
263            })),
264            ip_address,
265            user_agent,
266        )
267        .await;
268
269    // Return deployment info
270    let deployment = state.store.find_hosted_mock_by_id(deployment.id).await?.ok_or_else(|| {
271        ApiError::Internal(anyhow::anyhow!("Failed to retrieve created deployment"))
272    })?;
273
274    Ok(Json(DeploymentResponse::from(deployment)))
275}
276
277/// List all deployments for the organization
278pub async fn list_deployments(
279    State(state): State<AppState>,
280    AuthUser(user_id): AuthUser,
281    headers: HeaderMap,
282) -> ApiResult<Json<Vec<DeploymentResponse>>> {
283    // Resolve org context
284    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
285        .await
286        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
287
288    // Get all deployments
289    let deployments = state.store.list_hosted_mocks_by_org(org_ctx.org_id).await?;
290
291    let responses: Vec<DeploymentResponse> =
292        deployments.into_iter().map(DeploymentResponse::from).collect();
293
294    Ok(Json(responses))
295}
296
297/// Get deployment details
298pub async fn get_deployment(
299    State(state): State<AppState>,
300    AuthUser(user_id): AuthUser,
301    headers: HeaderMap,
302    Path(deployment_id): Path<Uuid>,
303) -> ApiResult<Json<DeploymentResponse>> {
304    // Resolve org context
305    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
306        .await
307        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
308
309    // Get deployment
310    let deployment = state
311        .store
312        .find_hosted_mock_by_id(deployment_id)
313        .await?
314        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
315
316    // Verify access
317    if deployment.org_id != org_ctx.org_id {
318        return Err(ApiError::InvalidRequest(
319            "You don't have access to this deployment".to_string(),
320        ));
321    }
322
323    Ok(Json(DeploymentResponse::from(deployment)))
324}
325
326/// Update deployment status (internal/admin use)
327pub async fn update_deployment_status(
328    State(state): State<AppState>,
329    AuthUser(user_id): AuthUser,
330    headers: HeaderMap,
331    Path(deployment_id): Path<Uuid>,
332    Json(request): Json<UpdateStatusRequest>,
333) -> ApiResult<Json<DeploymentResponse>> {
334    // Resolve org context
335    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
336        .await
337        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
338
339    // Check permission
340    let checker = PermissionChecker::new(&state);
341    checker
342        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
343        .await?;
344
345    // Get deployment
346    let deployment = state
347        .store
348        .find_hosted_mock_by_id(deployment_id)
349        .await?
350        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
351
352    // Verify access
353    if deployment.org_id != org_ctx.org_id {
354        return Err(ApiError::InvalidRequest(
355            "You don't have access to this deployment".to_string(),
356        ));
357    }
358
359    // Update status
360    let status = DeploymentStatus::from_str(&request.status)
361        .ok_or_else(|| ApiError::InvalidRequest("Invalid status".to_string()))?;
362
363    state
364        .store
365        .update_hosted_mock_status(deployment_id, status, request.error_message.as_deref())
366        .await?;
367
368    // Update URLs if provided
369    if request.deployment_url.is_some() || request.internal_url.is_some() {
370        state
371            .store
372            .update_hosted_mock_urls(
373                deployment_id,
374                request.deployment_url.as_deref(),
375                request.internal_url.as_deref(),
376            )
377            .await?;
378    }
379
380    // Get updated deployment
381    let deployment = state.store.find_hosted_mock_by_id(deployment_id).await?.ok_or_else(|| {
382        ApiError::Internal(anyhow::anyhow!("Failed to retrieve updated deployment"))
383    })?;
384
385    // Send deployment status notification email (non-blocking)
386    if let Ok(Some(org)) = state.store.find_organization_by_id(deployment.org_id).await {
387        if let Ok(Some(owner)) = state.store.find_user_by_id(org.owner_id).await {
388            let status_str = format!("{:?}", deployment.status()).to_lowercase();
389            let email_msg = crate::email::EmailService::generate_deployment_status_email(
390                &owner.username,
391                &owner.email,
392                &deployment.name,
393                &status_str,
394                deployment.deployment_url.as_deref(),
395                request.error_message.as_deref(),
396            );
397
398            tokio::spawn(async move {
399                match crate::email::EmailService::from_env() {
400                    Ok(email_service) => {
401                        if let Err(e) = email_service.send(email_msg).await {
402                            tracing::warn!("Failed to send deployment status email: {}", e);
403                        }
404                    }
405                    Err(e) => {
406                        tracing::warn!("Failed to create email service: {}", e);
407                    }
408                }
409            });
410        }
411    }
412
413    Ok(Json(DeploymentResponse::from(deployment)))
414}
415
416/// Delete a deployment
417pub async fn delete_deployment(
418    State(state): State<AppState>,
419    AuthUser(user_id): AuthUser,
420    headers: HeaderMap,
421    Path(deployment_id): Path<Uuid>,
422) -> ApiResult<Json<serde_json::Value>> {
423    let pool = state.db.pool();
424
425    // Resolve org context
426    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
427        .await
428        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
429
430    // Check permission
431    let checker = PermissionChecker::new(&state);
432    checker
433        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockDelete)
434        .await?;
435
436    // Get deployment
437    let deployment = state
438        .store
439        .find_hosted_mock_by_id(deployment_id)
440        .await?
441        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
442
443    // Verify access
444    if deployment.org_id != org_ctx.org_id {
445        return Err(ApiError::InvalidRequest(
446            "You don't have access to this deployment".to_string(),
447        ));
448    }
449
450    // Record audit log before deletion
451    let ip_address = headers
452        .get("X-Forwarded-For")
453        .or_else(|| headers.get("X-Real-IP"))
454        .and_then(|h| h.to_str().ok())
455        .map(|s| s.split(',').next().unwrap_or(s).trim());
456    let user_agent = headers.get("User-Agent").and_then(|h| h.to_str().ok());
457
458    state
459        .store
460        .record_audit_event(
461            org_ctx.org_id,
462            Some(user_id),
463            AuditEventType::DeploymentDeleted,
464            format!("Hosted mock deployment '{}' deleted", deployment.name),
465            Some(serde_json::json!({
466                "deployment_id": deployment.id,
467                "name": deployment.name,
468                "slug": deployment.slug,
469            })),
470            ip_address,
471            user_agent,
472        )
473        .await;
474
475    // Update status to deleting before cleanup
476    state
477        .store
478        .update_hosted_mock_status(deployment_id, DeploymentStatus::Deleting, None)
479        .await?;
480
481    // Trigger actual deletion (stop service, cleanup resources, etc.)
482    // Try to delete from Fly.io if configured
483    if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
484        let flyio_client = FlyioClient::new(flyio_token);
485
486        // Generate app name (same as in orchestrator)
487        let app_name = format!(
488            "mockforge-{}-{}",
489            deployment
490                .org_id
491                .to_string()
492                .replace("-", "")
493                .chars()
494                .take(8)
495                .collect::<String>(),
496            deployment.slug
497        );
498
499        // Try to get machine_id from metadata
500        let machine_id = deployment.metadata_json.get("flyio_machine_id").and_then(|v| v.as_str());
501
502        if let Some(machine_id) = machine_id {
503            // Delete the specific machine
504            match flyio_client.delete_machine(&app_name, machine_id).await {
505                Ok(_) => {
506                    DeploymentLog::create(
507                        pool,
508                        deployment_id,
509                        "info",
510                        &format!("Deleted Fly.io machine: {}", machine_id),
511                        None,
512                    )
513                    .await
514                    .ok();
515                }
516                Err(e) => {
517                    warn!("Failed to delete Fly.io machine {}: {}", machine_id, e);
518                    DeploymentLog::create(
519                        pool,
520                        deployment_id,
521                        "warning",
522                        &format!("Failed to delete Fly.io machine: {}", e),
523                        None,
524                    )
525                    .await
526                    .ok();
527                    // Continue with app deletion and database deletion
528                }
529            }
530        } else {
531            // Machine ID not found in metadata, try to list and delete all machines
532            warn!(
533                "Machine ID not found in metadata for deployment {}, attempting to list machines",
534                deployment_id
535            );
536            match flyio_client.list_machines(&app_name).await {
537                Ok(machines) => {
538                    for machine in machines {
539                        if let Err(e) = flyio_client.delete_machine(&app_name, &machine.id).await {
540                            warn!("Failed to delete Fly.io machine {}: {}", machine.id, e);
541                        } else {
542                            DeploymentLog::create(
543                                pool,
544                                deployment_id,
545                                "info",
546                                &format!("Deleted Fly.io machine: {}", machine.id),
547                                None,
548                            )
549                            .await
550                            .ok();
551                        }
552                    }
553                }
554                Err(e) => {
555                    warn!("Failed to list Fly.io machines for app {}: {}", app_name, e);
556                    // Continue with app deletion and database deletion
557                }
558            }
559        }
560
561        // Delete the Fly.io app itself to avoid empty apps piling up
562        match flyio_client.delete_app(&app_name).await {
563            Ok(_) => {
564                DeploymentLog::create(
565                    pool,
566                    deployment_id,
567                    "info",
568                    &format!("Deleted Fly.io app: {}", app_name),
569                    None,
570                )
571                .await
572                .ok();
573            }
574            Err(e) => {
575                warn!("Failed to delete Fly.io app {}: {}", app_name, e);
576            }
577        }
578    }
579
580    // Soft delete from database
581    state.store.delete_hosted_mock(deployment_id).await?;
582
583    DeploymentLog::create(pool, deployment_id, "info", "Deployment deleted successfully", None)
584        .await
585        .ok(); // Log but don't fail on log error
586
587    Ok(Json(serde_json::json!({
588        "success": true,
589        "message": "Deployment deleted"
590    })))
591}
592
593/// Request body for redeployment (all fields optional)
594#[derive(Debug, Deserialize, Default)]
595pub struct RedeployRequest {
596    /// Updated OpenAPI spec (replaces existing config)
597    pub config_json: Option<serde_json::Value>,
598    /// Updated spec URL
599    pub openapi_spec_url: Option<String>,
600}
601
602/// Redeploy an existing hosted mock deployment
603///
604/// Updates the machine image and optionally the spec, then restarts.
605pub async fn redeploy_deployment(
606    State(state): State<AppState>,
607    AuthUser(user_id): AuthUser,
608    headers: HeaderMap,
609    Path(deployment_id): Path<Uuid>,
610    body: Option<Json<RedeployRequest>>,
611) -> ApiResult<Json<serde_json::Value>> {
612    let pool = state.db.pool();
613
614    // Resolve org context
615    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
616        .await
617        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
618
619    // Check permission (reuse deploy permission)
620    let checker = PermissionChecker::new(&state);
621    checker
622        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
623        .await?;
624    // pool kept for DeploymentLog + the spawned redeploy orchestration task below
625
626    // Get existing deployment
627    let deployment = state
628        .store
629        .find_hosted_mock_by_id(deployment_id)
630        .await?
631        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
632
633    // Verify ownership
634    if deployment.org_id != org_ctx.org_id {
635        return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
636    }
637
638    // Only allow redeployment of active or failed deployments
639    let status = deployment.status();
640    if !matches!(status, DeploymentStatus::Active | DeploymentStatus::Failed) {
641        return Err(ApiError::InvalidRequest(format!(
642            "Cannot redeploy a deployment with status '{}'. Must be 'active' or 'failed'.",
643            status
644        )));
645    }
646
647    // Update spec if provided
648    let request = body.map(|b| b.0).unwrap_or_default();
649    if request.config_json.is_some() || request.openapi_spec_url.is_some() {
650        let mut query = String::from("UPDATE hosted_mocks SET updated_at = NOW()");
651        let mut param_count = 0;
652
653        if request.config_json.is_some() {
654            param_count += 1;
655            query.push_str(&format!(", config_json = ${}", param_count));
656        }
657        if request.openapi_spec_url.is_some() {
658            param_count += 1;
659            query.push_str(&format!(", openapi_spec_url = ${}", param_count));
660        }
661        param_count += 1;
662        query.push_str(&format!(" WHERE id = ${}", param_count));
663
664        let mut q = sqlx::query(&query);
665        if let Some(ref config) = request.config_json {
666            q = q.bind(config);
667        }
668        if let Some(ref spec_url) = request.openapi_spec_url {
669            q = q.bind(spec_url);
670        }
671        q = q.bind(deployment_id);
672        q.execute(pool).await.map_err(|e| {
673            ApiError::Internal(anyhow::anyhow!("Failed to update deployment: {}", e))
674        })?;
675    }
676
677    // Update status to deploying
678    state
679        .store
680        .update_hosted_mock_status(deployment_id, DeploymentStatus::Deploying, None)
681        .await?;
682
683    DeploymentLog::create(pool, deployment_id, "info", "Redeployment initiated", None)
684        .await
685        .ok();
686
687    // Trigger redeployment in background
688    let pool_clone = pool.clone();
689    let deployment_id_clone = deployment_id;
690    tokio::spawn(async move {
691        let pool = &pool_clone;
692
693        // Fetch the updated deployment
694        let updated_deployment = match HostedMock::find_by_id(pool, deployment_id_clone).await {
695            Ok(Some(d)) => d,
696            Ok(None) => {
697                tracing::error!("Deployment {} not found during redeploy", deployment_id_clone);
698                return;
699            }
700            Err(e) => {
701                tracing::error!(
702                    "Failed to fetch deployment {} for redeploy: {}",
703                    deployment_id_clone,
704                    e
705                );
706                return;
707            }
708        };
709
710        // Try to redeploy via Fly.io if configured
711        if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
712            let flyio_client = FlyioClient::new(flyio_token);
713
714            let machine_id = updated_deployment
715                .metadata_json
716                .get("flyio_machine_id")
717                .and_then(|v| v.as_str());
718
719            if let Some(machine_id) = machine_id {
720                let app_name = format!(
721                    "mockforge-{}-{}",
722                    updated_deployment
723                        .org_id
724                        .to_string()
725                        .replace('-', "")
726                        .chars()
727                        .take(8)
728                        .collect::<String>(),
729                    updated_deployment.slug
730                );
731
732                // Build updated machine config
733                let mut env = std::collections::HashMap::new();
734                env.insert(
735                    "MOCKFORGE_DEPLOYMENT_ID".to_string(),
736                    updated_deployment.id.to_string(),
737                );
738                env.insert("MOCKFORGE_ORG_ID".to_string(), updated_deployment.org_id.to_string());
739                if let Ok(config_str) = serde_json::to_string(&updated_deployment.config_json) {
740                    env.insert("MOCKFORGE_CONFIG".to_string(), config_str);
741                }
742                env.insert("PORT".to_string(), "3000".to_string());
743
744                if let Some(ref spec_url) = updated_deployment.openapi_spec_url {
745                    env.insert("MOCKFORGE_OPENAPI_SPEC_URL".to_string(), spec_url.clone());
746                }
747
748                let image = std::env::var("MOCKFORGE_DOCKER_IMAGE")
749                    .unwrap_or_else(|_| "ghcr.io/saasy-solutions/mockforge:latest".to_string());
750
751                use crate::deployment::flyio::{
752                    FlyioCheck, FlyioMachineConfig, FlyioPort, FlyioRegistryAuth, FlyioService,
753                };
754
755                let services = vec![FlyioService {
756                    protocol: "tcp".to_string(),
757                    internal_port: 3000,
758                    ports: vec![
759                        FlyioPort {
760                            port: 80,
761                            handlers: vec!["http".to_string()],
762                        },
763                        FlyioPort {
764                            port: 443,
765                            handlers: vec!["tls".to_string(), "http".to_string()],
766                        },
767                    ],
768                }];
769
770                let mut checks = std::collections::HashMap::new();
771                checks.insert(
772                    "alive".to_string(),
773                    FlyioCheck {
774                        check_type: "http".to_string(),
775                        port: 3000,
776                        grace_period: "10s".to_string(),
777                        interval: "15s".to_string(),
778                        method: "GET".to_string(),
779                        timeout: "2s".to_string(),
780                        tls_skip_verify: false,
781                        path: Some("/health/live".to_string()),
782                    },
783                );
784
785                let machine_config = FlyioMachineConfig {
786                    image,
787                    env,
788                    services,
789                    checks: Some(checks),
790                    // None = accept Fly's API default
791                    // (shared-cpu-1x:256MB) — same as other call
792                    // sites. Plugin-enabled redeploys will set this
793                    // via FlyioGuest::for_hosted_mock in Phase 2.
794                    guest: None,
795                };
796
797                // Build registry auth
798                let registry_auth = if let (Ok(server), Ok(username), Ok(password)) = (
799                    std::env::var("DOCKER_REGISTRY_SERVER"),
800                    std::env::var("DOCKER_REGISTRY_USERNAME"),
801                    std::env::var("DOCKER_REGISTRY_PASSWORD"),
802                ) {
803                    Some(FlyioRegistryAuth {
804                        server,
805                        username,
806                        password,
807                    })
808                } else if machine_config.image.starts_with("registry.fly.io/") {
809                    Some(FlyioRegistryAuth {
810                        server: "registry.fly.io".to_string(),
811                        username: "x".to_string(),
812                        password: flyio_client.api_token().to_string(),
813                    })
814                } else {
815                    None
816                };
817
818                match flyio_client
819                    .update_machine(&app_name, machine_id, machine_config, registry_auth)
820                    .await
821                {
822                    Ok(_) => {
823                        let _ = DeploymentLog::create(
824                            pool,
825                            deployment_id_clone,
826                            "info",
827                            "Machine updated and restarting",
828                            None,
829                        )
830                        .await;
831                    }
832                    Err(e) => {
833                        tracing::error!("Redeployment failed for {}: {:#}", deployment_id_clone, e);
834                        let _ = HostedMock::update_status(
835                            pool,
836                            deployment_id_clone,
837                            DeploymentStatus::Failed,
838                            Some(&format!("Redeployment failed: {}", e)),
839                        )
840                        .await;
841                        let _ = DeploymentLog::create(
842                            pool,
843                            deployment_id_clone,
844                            "error",
845                            &format!("Redeployment failed: {}", e),
846                            None,
847                        )
848                        .await;
849                        return;
850                    }
851                }
852            } else {
853                tracing::error!(
854                    "No Fly.io machine ID found for deployment {}",
855                    deployment_id_clone
856                );
857                let _ = HostedMock::update_status(
858                    pool,
859                    deployment_id_clone,
860                    DeploymentStatus::Failed,
861                    Some("No Fly.io machine ID found in deployment metadata"),
862                )
863                .await;
864                return;
865            }
866        }
867
868        // Mark as active
869        let _ =
870            HostedMock::update_status(pool, deployment_id_clone, DeploymentStatus::Active, None)
871                .await;
872
873        let _ = DeploymentLog::create(
874            pool,
875            deployment_id_clone,
876            "info",
877            "Redeployment completed successfully",
878            None,
879        )
880        .await;
881
882        tracing::info!("Successfully redeployed mock service: {}", deployment_id_clone);
883    });
884
885    Ok(Json(serde_json::json!({
886        "id": deployment_id,
887        "status": "deploying",
888        "message": "Redeployment initiated"
889    })))
890}
891
892fn deployment_app_name(deployment: &HostedMock) -> String {
893    format!(
894        "mockforge-{}-{}",
895        deployment
896            .org_id
897            .to_string()
898            .replace('-', "")
899            .chars()
900            .take(8)
901            .collect::<String>(),
902        deployment.slug
903    )
904}
905
906/// Stop a running hosted mock deployment.
907///
908/// Gracefully stops the Fly.io machine (without deleting it) and marks
909/// the deployment as `stopped`. Only active deployments can be stopped.
910pub async fn stop_deployment(
911    State(state): State<AppState>,
912    AuthUser(user_id): AuthUser,
913    headers: HeaderMap,
914    Path(deployment_id): Path<Uuid>,
915) -> ApiResult<Json<DeploymentResponse>> {
916    let pool = state.db.pool();
917
918    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
919        .await
920        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
921
922    let checker = PermissionChecker::new(&state);
923    checker
924        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
925        .await?;
926
927    let deployment = state
928        .store
929        .find_hosted_mock_by_id(deployment_id)
930        .await?
931        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
932
933    if deployment.org_id != org_ctx.org_id {
934        return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
935    }
936
937    let status = deployment.status();
938    if !matches!(status, DeploymentStatus::Active) {
939        return Err(ApiError::InvalidRequest(format!(
940            "Cannot stop a deployment with status '{}'. Must be 'active'.",
941            status
942        )));
943    }
944
945    if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
946        let flyio_client = FlyioClient::new(flyio_token);
947        let app_name = deployment_app_name(&deployment);
948        let machine_id = deployment.metadata_json.get("flyio_machine_id").and_then(|v| v.as_str());
949
950        if let Some(machine_id) = machine_id {
951            flyio_client.stop_machine(&app_name, machine_id).await.map_err(|e| {
952                ApiError::Internal(anyhow::anyhow!("Failed to stop machine: {}", e))
953            })?;
954        } else {
955            warn!(
956                "No Fly.io machine ID found for deployment {}; marking as stopped anyway",
957                deployment_id
958            );
959        }
960    }
961
962    state
963        .store
964        .update_hosted_mock_status(deployment_id, DeploymentStatus::Stopped, None)
965        .await?;
966
967    DeploymentLog::create(pool, deployment_id, "info", "Deployment stopped", None)
968        .await
969        .ok();
970
971    let updated = state.store.find_hosted_mock_by_id(deployment_id).await?.ok_or_else(|| {
972        ApiError::Internal(anyhow::anyhow!("Failed to retrieve updated deployment"))
973    })?;
974
975    Ok(Json(DeploymentResponse::from(updated)))
976}
977
978/// Start a stopped hosted mock deployment.
979pub async fn start_deployment(
980    State(state): State<AppState>,
981    AuthUser(user_id): AuthUser,
982    headers: HeaderMap,
983    Path(deployment_id): Path<Uuid>,
984) -> ApiResult<Json<DeploymentResponse>> {
985    let pool = state.db.pool();
986
987    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
988        .await
989        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
990
991    let checker = PermissionChecker::new(&state);
992    checker
993        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
994        .await?;
995
996    let deployment = state
997        .store
998        .find_hosted_mock_by_id(deployment_id)
999        .await?
1000        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1001
1002    if deployment.org_id != org_ctx.org_id {
1003        return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
1004    }
1005
1006    let status = deployment.status();
1007    if !matches!(status, DeploymentStatus::Stopped) {
1008        return Err(ApiError::InvalidRequest(format!(
1009            "Cannot start a deployment with status '{}'. Must be 'stopped'.",
1010            status
1011        )));
1012    }
1013
1014    if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
1015        let flyio_client = FlyioClient::new(flyio_token);
1016        let app_name = deployment_app_name(&deployment);
1017        let machine_id = deployment.metadata_json.get("flyio_machine_id").and_then(|v| v.as_str());
1018
1019        if let Some(machine_id) = machine_id {
1020            flyio_client.start_machine(&app_name, machine_id).await.map_err(|e| {
1021                ApiError::Internal(anyhow::anyhow!("Failed to start machine: {}", e))
1022            })?;
1023        } else {
1024            return Err(ApiError::InvalidRequest(
1025                "No Fly.io machine ID found in deployment metadata; cannot start".to_string(),
1026            ));
1027        }
1028    }
1029
1030    state
1031        .store
1032        .update_hosted_mock_status(deployment_id, DeploymentStatus::Active, None)
1033        .await?;
1034
1035    DeploymentLog::create(pool, deployment_id, "info", "Deployment started", None)
1036        .await
1037        .ok();
1038
1039    let updated = state.store.find_hosted_mock_by_id(deployment_id).await?.ok_or_else(|| {
1040        ApiError::Internal(anyhow::anyhow!("Failed to retrieve updated deployment"))
1041    })?;
1042
1043    Ok(Json(DeploymentResponse::from(updated)))
1044}
1045
1046/// Get deployment logs
1047pub async fn get_deployment_logs(
1048    State(state): State<AppState>,
1049    AuthUser(user_id): AuthUser,
1050    headers: HeaderMap,
1051    Path(deployment_id): Path<Uuid>,
1052) -> ApiResult<Json<Vec<LogResponse>>> {
1053    let pool = state.db.pool();
1054
1055    // Resolve org context
1056    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1057        .await
1058        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1059
1060    // Get deployment
1061    let deployment = HostedMock::find_by_id(pool, deployment_id)
1062        .await
1063        .map_err(ApiError::Database)?
1064        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1065
1066    // Verify access
1067    if deployment.org_id != org_ctx.org_id {
1068        return Err(ApiError::InvalidRequest(
1069            "You don't have access to this deployment".to_string(),
1070        ));
1071    }
1072
1073    // Get logs
1074    let logs = DeploymentLog::find_by_mock(pool, deployment_id, Some(100))
1075        .await
1076        .map_err(ApiError::Database)?;
1077
1078    let responses: Vec<LogResponse> = logs.into_iter().map(LogResponse::from).collect();
1079
1080    Ok(Json(responses))
1081}
1082
1083/// Query parameters for the runtime-logs endpoint.
1084#[derive(Debug, Deserialize)]
1085pub struct RuntimeLogsQuery {
1086    /// Maximum entries to return. Falls back to the env-configured default.
1087    pub limit: Option<u32>,
1088    /// RFC3339 timestamp; only entries strictly newer than this are returned.
1089    pub since: Option<String>,
1090}
1091
1092/// Get runtime logs for a deployment by polling the Fly logs API.
1093///
1094/// This is the new "logs" surface (#224). The historical
1095/// `GET /api/v1/hosted-mocks/{id}/logs` endpoint stays as deployment events
1096/// (lifecycle entries from the local `deployment_logs` table) so the UI can
1097/// surface both views: an "Events" tab from the existing endpoint and a
1098/// "Logs" tab backed by this one.
1099pub async fn get_runtime_logs(
1100    State(state): State<AppState>,
1101    AuthUser(user_id): AuthUser,
1102    headers: HeaderMap,
1103    Path(deployment_id): Path<Uuid>,
1104    Query(params): Query<RuntimeLogsQuery>,
1105) -> ApiResult<Json<Vec<LogEntry>>> {
1106    let pool = state.db.pool();
1107
1108    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1109        .await
1110        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1111
1112    let deployment = HostedMock::find_by_id(pool, deployment_id)
1113        .await
1114        .map_err(ApiError::Database)?
1115        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1116
1117    if deployment.org_id != org_ctx.org_id {
1118        return Err(ApiError::InvalidRequest(
1119            "You don't have access to this deployment".to_string(),
1120        ));
1121    }
1122
1123    let Some(client) = crate::fly_logs::global() else {
1124        // No Fly token configured — return empty rather than 500. Operators
1125        // see this in self-hosted / dev; the UI shows a "not configured"
1126        // hint when the list is empty and the configured flag (sent in a
1127        // header below) is false.
1128        return Ok(Json(Vec::new()));
1129    };
1130
1131    let since = params
1132        .since
1133        .as_deref()
1134        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
1135        .map(|d| d.with_timezone(&Utc));
1136
1137    let app_name = deployment.fly_app_name();
1138    let entries = client.fetch_recent(&app_name, since, params.limit).await.map_err(|e| {
1139        warn!(error = %e, app_name = %app_name, "Fly runtime logs fetch failed");
1140        ApiError::Internal(anyhow::anyhow!("Fly logs query failed: {}", e))
1141    })?;
1142
1143    Ok(Json(entries))
1144}
1145
1146/// SSE stream of runtime logs. Polls the Fly logs API every few seconds and
1147/// emits new entries as `data:` events. Closes on the first transient error
1148/// after surfacing it as a `data:` event so the browser can render a banner.
1149pub async fn stream_runtime_logs(
1150    State(state): State<AppState>,
1151    AuthUser(user_id): AuthUser,
1152    headers: HeaderMap,
1153    Path(deployment_id): Path<Uuid>,
1154) -> ApiResult<Sse<impl Stream<Item = Result<Event, Infallible>>>> {
1155    let pool = state.db.pool();
1156    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1157        .await
1158        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1159
1160    let deployment = HostedMock::find_by_id(pool, deployment_id)
1161        .await
1162        .map_err(ApiError::Database)?
1163        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1164
1165    if deployment.org_id != org_ctx.org_id {
1166        return Err(ApiError::InvalidRequest(
1167            "You don't have access to this deployment".to_string(),
1168        ));
1169    }
1170
1171    let app_name = deployment.fly_app_name();
1172    let stream = build_runtime_logs_stream(app_name);
1173
1174    Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
1175}
1176
1177/// Build the SSE event stream for runtime logs. Each tick polls Fly with a
1178/// `since` cursor that advances past the latest entry observed so we don't
1179/// duplicate events on refetch.
1180fn build_runtime_logs_stream(app_name: String) -> impl Stream<Item = Result<Event, Infallible>> {
1181    use futures_util::stream::unfold;
1182
1183    /// Per-stream poll state: cursor + tick count for an initial empty-poll
1184    /// quietness. We start the cursor a few seconds in the past so the first
1185    /// poll picks up "what's happening now" without dumping the whole window.
1186    struct State {
1187        cursor: DateTime<Utc>,
1188        client: Option<&'static crate::fly_logs::FlyLogsClient>,
1189        app_name: String,
1190        emitted_unconfigured: bool,
1191    }
1192
1193    let state = State {
1194        cursor: Utc::now() - chrono::Duration::seconds(30),
1195        client: crate::fly_logs::global(),
1196        app_name,
1197        emitted_unconfigured: false,
1198    };
1199
1200    unfold(state, |mut st| async move {
1201        // Slow poll loop. 2 seconds keeps the UI responsive without hammering
1202        // Fly's API; tighten once we move to the NATS subscription path
1203        // (#232).
1204        if st.client.is_none() && !st.emitted_unconfigured {
1205            st.emitted_unconfigured = true;
1206            let event = Event::default()
1207                .event("config")
1208                .data("Fly runtime logs are not configured (FLYIO_API_TOKEN unset)");
1209            return Some((Ok(event), st));
1210        }
1211
1212        tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1213
1214        let Some(client) = st.client else {
1215            // Configured-flag already emitted; keep stream alive with a
1216            // periodic comment so the connection doesn't close.
1217            let event = Event::default().comment("idle");
1218            return Some((Ok(event), st));
1219        };
1220
1221        match client.fetch_recent(&st.app_name, Some(st.cursor), None).await {
1222            Ok(entries) if entries.is_empty() => {
1223                let event = Event::default().comment("no-new-logs");
1224                Some((Ok(event), st))
1225            }
1226            Ok(entries) => {
1227                // Advance cursor to the newest entry we just emitted.
1228                if let Some(latest) = entries.iter().map(|e| e.timestamp).max() {
1229                    st.cursor = latest;
1230                }
1231                let payload = serde_json::to_string(&entries).unwrap_or_else(|_| "[]".to_string());
1232                let event = Event::default().event("logs").data(payload);
1233                Some((Ok(event), st))
1234            }
1235            Err(err) => {
1236                let payload = serde_json::json!({ "error": err.to_string() }).to_string();
1237                let event = Event::default().event("error").data(payload);
1238                Some((Ok(event), st))
1239            }
1240        }
1241    })
1242}
1243
1244/// One captured request/response event. Mirrors `RequestLogEvent` in
1245/// `mockforge-observability::log_shipper` — both sides serialize via serde
1246/// so the wire format is the canonical contract.
1247#[derive(Debug, Deserialize, Serialize)]
1248pub struct RuntimeRequestEvent {
1249    pub timestamp: DateTime<Utc>,
1250    pub method: String,
1251    pub path: String,
1252    pub status: u16,
1253    pub latency_ms: u32,
1254    #[serde(default)]
1255    pub matched_route: Option<String>,
1256    #[serde(default)]
1257    pub client_ip: Option<String>,
1258    #[serde(default)]
1259    pub user_agent: Option<String>,
1260    #[serde(default)]
1261    pub request_id: Option<String>,
1262    #[serde(default)]
1263    pub bytes_in: Option<i64>,
1264    #[serde(default)]
1265    pub bytes_out: Option<i64>,
1266}
1267
1268#[derive(Debug, Deserialize)]
1269pub struct IngestPayload {
1270    pub events: Vec<RuntimeRequestEvent>,
1271}
1272
1273#[derive(Debug, Serialize)]
1274pub struct IngestResponse {
1275    pub accepted: usize,
1276}
1277
1278/// Ingest a batch of structured request logs from a hosted-mock container.
1279///
1280/// This endpoint is **not** behind the user-scoped `auth_middleware` — it
1281/// authenticates with a deployment-scoped JWT minted by the orchestrator
1282/// at deploy time and passed to the container as `MOCKFORGE_LOG_INGEST_TOKEN`.
1283/// The token's subject embeds the deployment_id; we reject mismatches.
1284pub async fn ingest_runtime_logs(
1285    State(state): State<AppState>,
1286    Path(deployment_id): Path<Uuid>,
1287    headers: HeaderMap,
1288    Json(payload): Json<IngestPayload>,
1289) -> ApiResult<Json<IngestResponse>> {
1290    // Bearer token from the in-container shipper.
1291    let auth = headers
1292        .get("Authorization")
1293        .and_then(|h| h.to_str().ok())
1294        .and_then(|h| h.strip_prefix("Bearer "))
1295        .ok_or_else(|| ApiError::InvalidRequest("Missing deployment ingest token".to_string()))?;
1296
1297    let token_deployment_id = mockforge_registry_core::auth::verify_deployment_ingest_token(
1298        auth,
1299        &state.config.jwt_secret,
1300    )
1301    .map_err(|e| {
1302        tracing::warn!(error = %e, "Deployment ingest token rejected");
1303        ApiError::InvalidRequest("Invalid deployment ingest token".to_string())
1304    })?;
1305
1306    if token_deployment_id != deployment_id {
1307        return Err(ApiError::InvalidRequest(
1308            "Token deployment id does not match URL path".to_string(),
1309        ));
1310    }
1311
1312    if payload.events.is_empty() {
1313        return Ok(Json(IngestResponse { accepted: 0 }));
1314    }
1315
1316    // Cap accepted batch size as a defence against runaway shippers — a
1317    // misbehaving container shouldn't be able to flood the table with one
1318    // request. Matches the shipper's default flush size.
1319    const MAX_BATCH: usize = 500;
1320    let events: Vec<RuntimeRequestEvent> = payload.events.into_iter().take(MAX_BATCH).collect();
1321    let accepted = events.len();
1322
1323    // Bulk insert. Building one INSERT with many VALUES rows would be
1324    // marginally faster but uglier with sqlx; the row count per batch is
1325    // small (50 by default), so per-row inserts in a transaction is fine.
1326    let pool = state.db.pool();
1327    let mut tx = pool.begin().await.map_err(ApiError::Database)?;
1328    for evt in &events {
1329        sqlx::query(
1330            r#"
1331            INSERT INTO runtime_request_logs (
1332                deployment_id, occurred_at, method, path, status, latency_ms,
1333                matched_route, client_ip, user_agent, request_id, bytes_in, bytes_out
1334            )
1335            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
1336            "#,
1337        )
1338        .bind(deployment_id)
1339        .bind(evt.timestamp)
1340        .bind(&evt.method)
1341        .bind(&evt.path)
1342        .bind(evt.status as i16)
1343        .bind(evt.latency_ms as i32)
1344        .bind(evt.matched_route.as_ref())
1345        .bind(evt.client_ip.as_ref())
1346        .bind(evt.user_agent.as_ref())
1347        .bind(evt.request_id.as_ref())
1348        .bind(evt.bytes_in)
1349        .bind(evt.bytes_out)
1350        .execute(&mut *tx)
1351        .await
1352        .map_err(ApiError::Database)?;
1353    }
1354    tx.commit().await.map_err(ApiError::Database)?;
1355
1356    Ok(Json(IngestResponse { accepted }))
1357}
1358
1359// === Capture ingest (#234 part 2) ===
1360//
1361// The recorder ships completed exchanges (request + response) here. The
1362// wire format mirrors `mockforge_recorder::models::RecordedExchange` —
1363// we duplicate just the fields rather than depend on the recorder crate
1364// directly, since the registry server doesn't need any of the recorder's
1365// behaviour. Both sides round-trip through serde, so renames on the
1366// recorder side stay compatible as long as the JSON field names match.
1367
1368#[derive(Debug, Deserialize)]
1369pub struct CaptureIngestRequest {
1370    pub id: String,
1371    pub protocol: String,
1372    pub timestamp: DateTime<Utc>,
1373    pub method: String,
1374    pub path: String,
1375    #[serde(default)]
1376    pub query_params: Option<String>,
1377    pub headers: String,
1378    #[serde(default)]
1379    pub body: Option<String>,
1380    pub body_encoding: String,
1381    #[serde(default)]
1382    pub client_ip: Option<String>,
1383    #[serde(default)]
1384    pub trace_id: Option<String>,
1385    #[serde(default)]
1386    pub span_id: Option<String>,
1387    #[serde(default)]
1388    pub duration_ms: Option<i64>,
1389    #[serde(default)]
1390    pub status_code: Option<i32>,
1391    #[serde(default)]
1392    pub tags: Option<String>,
1393}
1394
1395#[derive(Debug, Deserialize)]
1396pub struct CaptureIngestResponse {
1397    pub status_code: i32,
1398    pub headers: String,
1399    #[serde(default)]
1400    pub body: Option<String>,
1401    pub body_encoding: String,
1402    pub size_bytes: i64,
1403    pub timestamp: DateTime<Utc>,
1404}
1405
1406#[derive(Debug, Deserialize)]
1407pub struct CaptureIngestExchange {
1408    pub request: CaptureIngestRequest,
1409    #[serde(default)]
1410    pub response: Option<CaptureIngestResponse>,
1411}
1412
1413#[derive(Debug, Deserialize)]
1414pub struct CaptureIngestPayload {
1415    pub exchanges: Vec<CaptureIngestExchange>,
1416}
1417
1418/// Persist a batch of recorder captures shipped by an in-container
1419/// `CaptureCloudSyncHandle`. Same deployment-scoped JWT contract as the
1420/// log and OTLP ingest endpoints. Inserts use ON CONFLICT DO NOTHING so
1421/// that re-shipped exchanges (retries, container restarts with cached
1422/// buffer) don't produce duplicates.
1423pub async fn ingest_runtime_captures(
1424    State(state): State<AppState>,
1425    Path(deployment_id): Path<Uuid>,
1426    headers: HeaderMap,
1427    Json(payload): Json<CaptureIngestPayload>,
1428) -> ApiResult<Json<IngestResponse>> {
1429    let auth = headers
1430        .get("Authorization")
1431        .and_then(|h| h.to_str().ok())
1432        .and_then(|h| h.strip_prefix("Bearer "))
1433        .ok_or_else(|| ApiError::InvalidRequest("Missing deployment ingest token".to_string()))?;
1434
1435    let token_deployment_id = mockforge_registry_core::auth::verify_deployment_ingest_token(
1436        auth,
1437        &state.config.jwt_secret,
1438    )
1439    .map_err(|e| {
1440        tracing::warn!(error = %e, "Capture ingest token rejected");
1441        ApiError::InvalidRequest("Invalid deployment ingest token".to_string())
1442    })?;
1443
1444    if token_deployment_id != deployment_id {
1445        return Err(ApiError::InvalidRequest(
1446            "Token deployment id does not match URL path".to_string(),
1447        ));
1448    }
1449
1450    if payload.exchanges.is_empty() {
1451        return Ok(Json(IngestResponse { accepted: 0 }));
1452    }
1453
1454    // Capture rows are larger than log rows (full request + response
1455    // bodies), so the cap is tighter — a runaway shipper shouldn't fill
1456    // a single transaction with megabytes of payload.
1457    const MAX_BATCH: usize = 100;
1458    let exchanges: Vec<CaptureIngestExchange> =
1459        payload.exchanges.into_iter().take(MAX_BATCH).collect();
1460    let accepted = exchanges.len();
1461
1462    let pool = state.db.pool();
1463    let mut tx = pool.begin().await.map_err(ApiError::Database)?;
1464    for exchange in &exchanges {
1465        let req = &exchange.request;
1466        let resp = exchange.response.as_ref();
1467        sqlx::query(
1468            r#"
1469            INSERT INTO runtime_captures (
1470                deployment_id, capture_id, protocol, occurred_at, method, path,
1471                query_params, request_headers, request_body, request_body_encoding,
1472                client_ip, trace_id, span_id, duration_ms, status_code, tags,
1473                response_status_code, response_headers, response_body,
1474                response_body_encoding, response_size_bytes, response_timestamp
1475            )
1476            VALUES (
1477                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14,
1478                $15, $16, $17, $18, $19, $20, $21, $22
1479            )
1480            ON CONFLICT (deployment_id, capture_id) DO NOTHING
1481            "#,
1482        )
1483        .bind(deployment_id)
1484        .bind(&req.id)
1485        .bind(&req.protocol)
1486        .bind(req.timestamp)
1487        .bind(&req.method)
1488        .bind(&req.path)
1489        .bind(req.query_params.as_ref())
1490        .bind(&req.headers)
1491        .bind(req.body.as_ref())
1492        .bind(&req.body_encoding)
1493        .bind(req.client_ip.as_ref())
1494        .bind(req.trace_id.as_ref())
1495        .bind(req.span_id.as_ref())
1496        .bind(req.duration_ms)
1497        .bind(req.status_code)
1498        .bind(req.tags.as_ref())
1499        .bind(resp.map(|r| r.status_code))
1500        .bind(resp.map(|r| r.headers.as_str()))
1501        .bind(resp.and_then(|r| r.body.as_deref()))
1502        .bind(resp.map(|r| r.body_encoding.as_str()))
1503        .bind(resp.map(|r| r.size_bytes))
1504        .bind(resp.map(|r| r.timestamp))
1505        .execute(&mut *tx)
1506        .await
1507        .map_err(ApiError::Database)?;
1508    }
1509    tx.commit().await.map_err(ApiError::Database)?;
1510
1511    Ok(Json(IngestResponse { accepted }))
1512}
1513
1514#[derive(Debug, Deserialize)]
1515pub struct RuntimeRequestsQuery {
1516    /// Maximum entries to return. Capped at 500 server-side.
1517    pub limit: Option<u32>,
1518    /// RFC3339 timestamp; only entries strictly newer than this are returned.
1519    pub since: Option<String>,
1520}
1521
1522/// Read back recent runtime request logs for a deployment. Powers the
1523/// admin UI's "Requests" tab.
1524pub async fn get_runtime_requests(
1525    State(state): State<AppState>,
1526    AuthUser(user_id): AuthUser,
1527    headers: HeaderMap,
1528    Path(deployment_id): Path<Uuid>,
1529    Query(params): Query<RuntimeRequestsQuery>,
1530) -> ApiResult<Json<Vec<RuntimeRequestEvent>>> {
1531    let pool = state.db.pool();
1532
1533    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1534        .await
1535        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1536
1537    let deployment = HostedMock::find_by_id(pool, deployment_id)
1538        .await
1539        .map_err(ApiError::Database)?
1540        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1541
1542    if deployment.org_id != org_ctx.org_id {
1543        return Err(ApiError::InvalidRequest(
1544            "You don't have access to this deployment".to_string(),
1545        ));
1546    }
1547
1548    let limit = params.limit.unwrap_or(100).min(500) as i64;
1549    let since = params
1550        .since
1551        .as_deref()
1552        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
1553        .map(|d| d.with_timezone(&Utc));
1554
1555    type RuntimeRequestRow = (
1556        DateTime<Utc>,
1557        String,
1558        String,
1559        i16,
1560        i32,
1561        Option<String>,
1562        Option<String>,
1563        Option<String>,
1564        Option<String>,
1565        Option<i64>,
1566        Option<i64>,
1567    );
1568
1569    let rows: Vec<RuntimeRequestRow> = if let Some(since) = since {
1570        sqlx::query_as(
1571            r#"
1572            SELECT occurred_at, method, path, status, latency_ms,
1573                   matched_route, client_ip, user_agent, request_id,
1574                   bytes_in, bytes_out
1575            FROM runtime_request_logs
1576            WHERE deployment_id = $1 AND occurred_at > $2
1577            ORDER BY occurred_at DESC
1578            LIMIT $3
1579            "#,
1580        )
1581        .bind(deployment_id)
1582        .bind(since)
1583        .bind(limit)
1584        .fetch_all(pool)
1585        .await
1586        .map_err(ApiError::Database)?
1587    } else {
1588        sqlx::query_as(
1589            r#"
1590            SELECT occurred_at, method, path, status, latency_ms,
1591                   matched_route, client_ip, user_agent, request_id,
1592                   bytes_in, bytes_out
1593            FROM runtime_request_logs
1594            WHERE deployment_id = $1
1595            ORDER BY occurred_at DESC
1596            LIMIT $2
1597            "#,
1598        )
1599        .bind(deployment_id)
1600        .bind(limit)
1601        .fetch_all(pool)
1602        .await
1603        .map_err(ApiError::Database)?
1604    };
1605
1606    let events: Vec<RuntimeRequestEvent> = rows
1607        .into_iter()
1608        .map(|row| RuntimeRequestEvent {
1609            timestamp: row.0,
1610            method: row.1,
1611            path: row.2,
1612            status: row.3 as u16,
1613            latency_ms: row.4 as u32,
1614            matched_route: row.5,
1615            client_ip: row.6,
1616            user_agent: row.7,
1617            request_id: row.8,
1618            bytes_in: row.9,
1619            bytes_out: row.10,
1620        })
1621        .collect();
1622
1623    Ok(Json(events))
1624}
1625
1626/// Proxy a request to the deployment's local recorder API.
1627///
1628/// The mockforge-recorder library is mounted on the deployed `http_app` at
1629/// `/api/recorder/*` (#234), but the deployed instance has no per-user auth
1630/// on those routes. This handler is the cloud-side gate: it verifies the
1631/// caller has access to the deployment, then forwards the request to the
1632/// deployment's internal URL.
1633///
1634/// Captures stay ephemeral on the deployment's local SQLite (Fly machines
1635/// don't have a persistent volume mounted by default). For long-term
1636/// retention we'd want either a Fly volume mount or a forwarder pattern
1637/// like the log shipper — both bigger and tracked separately.
1638async fn proxy_to_deployment_recorder(
1639    deployment: &HostedMock,
1640    path_and_query: &str,
1641) -> ApiResult<axum::http::Response<axum::body::Body>> {
1642    let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
1643    let Some(base) = base else {
1644        return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
1645    };
1646    let url = format!("{}{}", base.trim_end_matches('/'), path_and_query);
1647
1648    let client = reqwest::Client::builder()
1649        .timeout(std::time::Duration::from_secs(10))
1650        .build()
1651        .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
1652
1653    let resp =
1654        client.get(&url).send().await.map_err(|e| {
1655            ApiError::Internal(anyhow::anyhow!("Recorder proxy fetch failed: {}", e))
1656        })?;
1657
1658    let status = resp.status();
1659    let headers = resp.headers().clone();
1660    let body = resp.bytes().await.map_err(|e| {
1661        ApiError::Internal(anyhow::anyhow!("Recorder proxy read body failed: {}", e))
1662    })?;
1663
1664    let mut builder = axum::http::Response::builder().status(status);
1665    if let Some(content_type) = headers.get(axum::http::header::CONTENT_TYPE) {
1666        builder = builder.header(axum::http::header::CONTENT_TYPE, content_type);
1667    }
1668    builder.body(axum::body::Body::from(body)).map_err(|e| {
1669        ApiError::Internal(anyhow::anyhow!("Recorder proxy response build failed: {}", e))
1670    })
1671}
1672
1673/// Proxy a list/get to the deployment's state-machine API. Mirrors
1674/// `proxy_to_deployment_recorder` exactly — the only difference is the
1675/// upstream path is `/__mockforge/api/state-machines/*`. Kept as a thin
1676/// wrapper so the call sites read the same way as the recorder ones.
1677async fn proxy_to_deployment_state_machines(
1678    deployment: &HostedMock,
1679    path_and_query: &str,
1680) -> ApiResult<axum::http::Response<axum::body::Body>> {
1681    let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
1682    let Some(base) = base else {
1683        return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
1684    };
1685    let url = format!("{}{}", base.trim_end_matches('/'), path_and_query);
1686
1687    let client = reqwest::Client::builder()
1688        .timeout(std::time::Duration::from_secs(10))
1689        .build()
1690        .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
1691
1692    let resp = client.get(&url).send().await.map_err(|e| {
1693        ApiError::Internal(anyhow::anyhow!("State-machine proxy fetch failed: {}", e))
1694    })?;
1695    let status = resp.status();
1696    let headers = resp.headers().clone();
1697    let body = resp.bytes().await.map_err(|e| {
1698        ApiError::Internal(anyhow::anyhow!("State-machine proxy read body failed: {}", e))
1699    })?;
1700
1701    let mut builder = axum::http::Response::builder().status(status);
1702    if let Some(ct) = headers.get(axum::http::header::CONTENT_TYPE) {
1703        builder = builder.header(axum::http::header::CONTENT_TYPE, ct);
1704    }
1705    builder.body(axum::body::Body::from(body)).map_err(|e| {
1706        ApiError::Internal(anyhow::anyhow!("State-machine proxy response build failed: {}", e))
1707    })
1708}
1709
1710/// List state machines configured on a hosted-mock deployment. Proxies
1711/// `GET /__mockforge/api/state-machines` on the deployed instance.
1712pub async fn list_deployment_state_machines(
1713    State(state): State<AppState>,
1714    AuthUser(user_id): AuthUser,
1715    headers: HeaderMap,
1716    Path(deployment_id): Path<Uuid>,
1717) -> ApiResult<axum::http::Response<axum::body::Body>> {
1718    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
1719    proxy_to_deployment_state_machines(&deployment, "/__mockforge/api/state-machines").await
1720}
1721
1722/// Get a single state machine definition. Proxies the deployment's
1723/// `GET /__mockforge/api/state-machines/{resource_type}`.
1724pub async fn get_deployment_state_machine(
1725    State(state): State<AppState>,
1726    AuthUser(user_id): AuthUser,
1727    headers: HeaderMap,
1728    Path((deployment_id, resource_type)): Path<(Uuid, String)>,
1729) -> ApiResult<axum::http::Response<axum::body::Body>> {
1730    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
1731    if resource_type.contains('/') || resource_type.contains('?') || resource_type.contains('#') {
1732        return Err(ApiError::InvalidRequest("Invalid resource type".to_string()));
1733    }
1734    let path = format!("/__mockforge/api/state-machines/{}", urlencoding::encode(&resource_type));
1735    proxy_to_deployment_state_machines(&deployment, &path).await
1736}
1737
1738/// List instances of state machines on the deployment. Proxies
1739/// `GET /__mockforge/api/state-machines/instances`.
1740pub async fn list_deployment_state_machine_instances(
1741    State(state): State<AppState>,
1742    AuthUser(user_id): AuthUser,
1743    headers: HeaderMap,
1744    Path(deployment_id): Path<Uuid>,
1745) -> ApiResult<axum::http::Response<axum::body::Body>> {
1746    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
1747    proxy_to_deployment_state_machines(&deployment, "/__mockforge/api/state-machines/instances")
1748        .await
1749}
1750
1751#[derive(Debug, Deserialize)]
1752pub struct RecorderCapturesQuery {
1753    pub limit: Option<u32>,
1754    pub since: Option<String>,
1755}
1756
1757/// List recently captured request/response pairs for a deployment.
1758///
1759/// Reads from the cloud-side `runtime_captures` mirror (#234 part 2)
1760/// when the deployment has any synced rows; falls back to proxying the
1761/// deployment's `/api/recorder/requests` for older deployments that
1762/// haven't been redeployed onto the cloud-sync image. Once a deployment
1763/// has shipped at least one capture, we never proxy again — the
1764/// Postgres path is faster (no extra hop) and durable across restart.
1765pub async fn list_recorder_captures(
1766    State(state): State<AppState>,
1767    AuthUser(user_id): AuthUser,
1768    headers: HeaderMap,
1769    Path(deployment_id): Path<Uuid>,
1770    Query(params): Query<RecorderCapturesQuery>,
1771) -> ApiResult<axum::http::Response<axum::body::Body>> {
1772    let pool = state.db.pool();
1773
1774    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1775        .await
1776        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1777
1778    let deployment = HostedMock::find_by_id(pool, deployment_id)
1779        .await
1780        .map_err(ApiError::Database)?
1781        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1782
1783    if deployment.org_id != org_ctx.org_id {
1784        return Err(ApiError::InvalidRequest(
1785            "You don't have access to this deployment".to_string(),
1786        ));
1787    }
1788
1789    // If this deployment has synced any captures, always serve from
1790    // Postgres. The EXISTS check is index-bound (single row probe on
1791    // the `(deployment_id, occurred_at DESC)` index) and adds <1ms.
1792    let has_synced: bool = sqlx::query_scalar(
1793        "SELECT EXISTS(SELECT 1 FROM runtime_captures WHERE deployment_id = $1 LIMIT 1)",
1794    )
1795    .bind(deployment_id)
1796    .fetch_one(pool)
1797    .await
1798    .map_err(ApiError::Database)?;
1799
1800    if has_synced {
1801        let limit = params.limit.unwrap_or(100).min(500) as i64;
1802        let since = params
1803            .since
1804            .as_deref()
1805            .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
1806            .map(|d| d.with_timezone(&Utc));
1807        let captures = list_cloud_captures(pool, deployment_id, limit, since).await?;
1808        let body = serde_json::to_vec(&captures).map_err(|e| {
1809            ApiError::InvalidRequest(format!("Failed to serialize captures: {}", e))
1810        })?;
1811        return Ok(axum::http::Response::builder()
1812            .status(axum::http::StatusCode::OK)
1813            .header("content-type", "application/json")
1814            .body(axum::body::Body::from(body))
1815            .unwrap());
1816    }
1817
1818    let mut qs = String::from("/api/recorder/requests");
1819    let mut sep = '?';
1820    if let Some(limit) = params.limit {
1821        qs.push(sep);
1822        qs.push_str(&format!("limit={}", limit));
1823        sep = '&';
1824    }
1825    if let Some(since) = params.since.as_deref() {
1826        qs.push(sep);
1827        qs.push_str(&format!("since={}", urlencoding::encode(since)));
1828    }
1829
1830    let _ = state;
1831    proxy_to_deployment_recorder(&deployment, &qs).await
1832}
1833
1834/// Cloud-Postgres list query mapped into the same shape the recorder
1835/// proxy emits. Kept private — callers go through `list_recorder_captures`.
1836async fn list_cloud_captures(
1837    pool: &sqlx::PgPool,
1838    deployment_id: Uuid,
1839    limit: i64,
1840    since: Option<DateTime<Utc>>,
1841) -> ApiResult<Vec<serde_json::Value>> {
1842    type Row = (
1843        String,         // capture_id
1844        String,         // protocol
1845        DateTime<Utc>,  // occurred_at
1846        String,         // method
1847        String,         // path
1848        Option<String>, // query_params
1849        String,         // request_headers
1850        Option<String>, // request_body
1851        String,         // request_body_encoding
1852        Option<String>, // client_ip
1853        Option<String>, // trace_id
1854        Option<String>, // span_id
1855        Option<i64>,    // duration_ms
1856        Option<i32>,    // status_code
1857        Option<String>, // tags
1858    );
1859
1860    let rows: Vec<Row> = if let Some(since) = since {
1861        sqlx::query_as(
1862            r#"
1863            SELECT capture_id, protocol, occurred_at, method, path, query_params,
1864                   request_headers, request_body, request_body_encoding,
1865                   client_ip, trace_id, span_id, duration_ms, status_code, tags
1866            FROM runtime_captures
1867            WHERE deployment_id = $1 AND occurred_at > $2
1868            ORDER BY occurred_at DESC
1869            LIMIT $3
1870            "#,
1871        )
1872        .bind(deployment_id)
1873        .bind(since)
1874        .bind(limit)
1875        .fetch_all(pool)
1876        .await
1877        .map_err(ApiError::Database)?
1878    } else {
1879        sqlx::query_as(
1880            r#"
1881            SELECT capture_id, protocol, occurred_at, method, path, query_params,
1882                   request_headers, request_body, request_body_encoding,
1883                   client_ip, trace_id, span_id, duration_ms, status_code, tags
1884            FROM runtime_captures
1885            WHERE deployment_id = $1
1886            ORDER BY occurred_at DESC
1887            LIMIT $2
1888            "#,
1889        )
1890        .bind(deployment_id)
1891        .bind(limit)
1892        .fetch_all(pool)
1893        .await
1894        .map_err(ApiError::Database)?
1895    };
1896
1897    let captures = rows
1898        .into_iter()
1899        .map(|r| {
1900            serde_json::json!({
1901                "id": r.0,
1902                "protocol": r.1,
1903                "timestamp": r.2,
1904                "method": r.3,
1905                "path": r.4,
1906                "query_params": r.5,
1907                "headers": r.6,
1908                "body": r.7,
1909                "body_encoding": r.8,
1910                "client_ip": r.9,
1911                "trace_id": r.10,
1912                "span_id": r.11,
1913                "duration_ms": r.12,
1914                "status_code": r.13,
1915                "tags": r.14,
1916            })
1917        })
1918        .collect();
1919    Ok(captures)
1920}
1921
1922/// Get a single capture by id. Cloud-Postgres-first; falls through to
1923/// the deployment proxy when the row hasn't synced.
1924pub async fn get_recorder_capture(
1925    State(state): State<AppState>,
1926    AuthUser(user_id): AuthUser,
1927    headers: HeaderMap,
1928    Path((deployment_id, capture_id)): Path<(Uuid, String)>,
1929) -> ApiResult<axum::http::Response<axum::body::Body>> {
1930    let pool = state.db.pool();
1931
1932    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1933        .await
1934        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1935
1936    let deployment = HostedMock::find_by_id(pool, deployment_id)
1937        .await
1938        .map_err(ApiError::Database)?
1939        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1940
1941    if deployment.org_id != org_ctx.org_id {
1942        return Err(ApiError::InvalidRequest(
1943            "You don't have access to this deployment".to_string(),
1944        ));
1945    }
1946
1947    // Validate the capture id is path-safe — defence in depth even though
1948    // we already URL-encode below.
1949    if capture_id.contains('/') || capture_id.contains('?') || capture_id.contains('#') {
1950        return Err(ApiError::InvalidRequest("Invalid capture id".to_string()));
1951    }
1952
1953    if let Some(row) = fetch_cloud_capture(pool, deployment_id, &capture_id).await? {
1954        let body = serde_json::to_vec(&row)
1955            .map_err(|e| ApiError::InvalidRequest(format!("Failed to serialize capture: {}", e)))?;
1956        return Ok(axum::http::Response::builder()
1957            .status(axum::http::StatusCode::OK)
1958            .header("content-type", "application/json")
1959            .body(axum::body::Body::from(body))
1960            .unwrap());
1961    }
1962
1963    let path = format!("/api/recorder/requests/{}", urlencoding::encode(&capture_id));
1964    let _ = state;
1965    proxy_to_deployment_recorder(&deployment, &path).await
1966}
1967
1968/// Single-row counterpart to `list_cloud_captures`. Returns None when
1969/// the capture isn't in cloud Postgres — caller can then proxy.
1970async fn fetch_cloud_capture(
1971    pool: &sqlx::PgPool,
1972    deployment_id: Uuid,
1973    capture_id: &str,
1974) -> ApiResult<Option<serde_json::Value>> {
1975    type Row = (
1976        String,
1977        String,
1978        DateTime<Utc>,
1979        String,
1980        String,
1981        Option<String>,
1982        String,
1983        Option<String>,
1984        String,
1985        Option<String>,
1986        Option<String>,
1987        Option<String>,
1988        Option<i64>,
1989        Option<i32>,
1990        Option<String>,
1991    );
1992    let row: Option<Row> = sqlx::query_as(
1993        r#"
1994        SELECT capture_id, protocol, occurred_at, method, path, query_params,
1995               request_headers, request_body, request_body_encoding,
1996               client_ip, trace_id, span_id, duration_ms, status_code, tags
1997        FROM runtime_captures
1998        WHERE deployment_id = $1 AND capture_id = $2
1999        LIMIT 1
2000        "#,
2001    )
2002    .bind(deployment_id)
2003    .bind(capture_id)
2004    .fetch_optional(pool)
2005    .await
2006    .map_err(ApiError::Database)?;
2007    Ok(row.map(|r| {
2008        serde_json::json!({
2009            "id": r.0,
2010            "protocol": r.1,
2011            "timestamp": r.2,
2012            "method": r.3,
2013            "path": r.4,
2014            "query_params": r.5,
2015            "headers": r.6,
2016            "body": r.7,
2017            "body_encoding": r.8,
2018            "client_ip": r.9,
2019            "trace_id": r.10,
2020            "span_id": r.11,
2021            "duration_ms": r.12,
2022            "status_code": r.13,
2023            "tags": r.14,
2024        })
2025    }))
2026}
2027
2028/// Get the response body associated with a capture. Recorder splits the
2029/// request and response on separate endpoints so callers can paginate
2030/// requests cheaply without dragging response payloads along.
2031pub async fn get_recorder_capture_response(
2032    State(state): State<AppState>,
2033    AuthUser(user_id): AuthUser,
2034    headers: HeaderMap,
2035    Path((deployment_id, capture_id)): Path<(Uuid, String)>,
2036) -> ApiResult<axum::http::Response<axum::body::Body>> {
2037    let pool = state.db.pool();
2038
2039    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2040        .await
2041        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2042
2043    let deployment = HostedMock::find_by_id(pool, deployment_id)
2044        .await
2045        .map_err(ApiError::Database)?
2046        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2047
2048    if deployment.org_id != org_ctx.org_id {
2049        return Err(ApiError::InvalidRequest(
2050            "You don't have access to this deployment".to_string(),
2051        ));
2052    }
2053
2054    if capture_id.contains('/') || capture_id.contains('?') || capture_id.contains('#') {
2055        return Err(ApiError::InvalidRequest("Invalid capture id".to_string()));
2056    }
2057
2058    if let Some(row) = fetch_cloud_capture_response(pool, deployment_id, &capture_id).await? {
2059        let body = serde_json::to_vec(&row).map_err(|e| {
2060            ApiError::InvalidRequest(format!("Failed to serialize response: {}", e))
2061        })?;
2062        return Ok(axum::http::Response::builder()
2063            .status(axum::http::StatusCode::OK)
2064            .header("content-type", "application/json")
2065            .body(axum::body::Body::from(body))
2066            .unwrap());
2067    }
2068
2069    let path = format!("/api/recorder/requests/{}/response", urlencoding::encode(&capture_id));
2070    let _ = state;
2071    proxy_to_deployment_recorder(&deployment, &path).await
2072}
2073
2074/// Fetch the response side of a capture from cloud Postgres. Returns
2075/// None when the row isn't synced or the response side hasn't been
2076/// recorded yet (request-only exchanges).
2077async fn fetch_cloud_capture_response(
2078    pool: &sqlx::PgPool,
2079    deployment_id: Uuid,
2080    capture_id: &str,
2081) -> ApiResult<Option<serde_json::Value>> {
2082    type Row = (Option<i32>, Option<String>, Option<String>, Option<String>, Option<i64>);
2083    let row: Option<Row> = sqlx::query_as(
2084        r#"
2085        SELECT response_status_code, response_headers, response_body,
2086               response_body_encoding, response_size_bytes
2087        FROM runtime_captures
2088        WHERE deployment_id = $1 AND capture_id = $2
2089        LIMIT 1
2090        "#,
2091    )
2092    .bind(deployment_id)
2093    .bind(capture_id)
2094    .fetch_optional(pool)
2095    .await
2096    .map_err(ApiError::Database)?;
2097    Ok(row.and_then(|r| {
2098        let status_code = r.0?;
2099        Some(serde_json::json!({
2100            "status_code": status_code,
2101            "headers": r.1.unwrap_or_else(|| "{}".to_string()),
2102            "body": r.2,
2103            "body_encoding": r.3.unwrap_or_else(|| "utf8".to_string()),
2104            "size_bytes": r.4.unwrap_or(0),
2105        }))
2106    }))
2107}
2108
2109/// Proxy a POST to the deployment's recorder API. Used by the
2110/// enable/disable/clear mutations below — same auth model as the GET
2111/// proxies (user JWT gates access; the deployment itself has no
2112/// per-user auth on `/api/recorder/*`).
2113async fn proxy_post_to_deployment_recorder(
2114    deployment: &HostedMock,
2115    path: &str,
2116) -> ApiResult<axum::http::Response<axum::body::Body>> {
2117    let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
2118    let Some(base) = base else {
2119        return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
2120    };
2121    let url = format!("{}{}", base.trim_end_matches('/'), path);
2122
2123    let client = reqwest::Client::builder()
2124        .timeout(std::time::Duration::from_secs(10))
2125        .build()
2126        .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
2127
2128    let resp =
2129        client.post(&url).send().await.map_err(|e| {
2130            ApiError::Internal(anyhow::anyhow!("Recorder proxy POST failed: {}", e))
2131        })?;
2132
2133    let status = resp.status();
2134    let headers = resp.headers().clone();
2135    let body = resp.bytes().await.map_err(|e| {
2136        ApiError::Internal(anyhow::anyhow!("Recorder proxy read body failed: {}", e))
2137    })?;
2138
2139    let mut builder = axum::http::Response::builder().status(status);
2140    if let Some(content_type) = headers.get(axum::http::header::CONTENT_TYPE) {
2141        builder = builder.header(axum::http::header::CONTENT_TYPE, content_type);
2142    }
2143    builder.body(axum::body::Body::from(body)).map_err(|e| {
2144        ApiError::Internal(anyhow::anyhow!("Recorder proxy response build failed: {}", e))
2145    })
2146}
2147
2148async fn check_org_access(
2149    state: &AppState,
2150    user_id: Uuid,
2151    headers: &HeaderMap,
2152    deployment_id: Uuid,
2153) -> ApiResult<HostedMock> {
2154    let pool = state.db.pool();
2155
2156    let org_ctx = resolve_org_context(state, user_id, headers, None)
2157        .await
2158        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2159
2160    let deployment = HostedMock::find_by_id(pool, deployment_id)
2161        .await
2162        .map_err(ApiError::Database)?
2163        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2164
2165    if deployment.org_id != org_ctx.org_id {
2166        return Err(ApiError::InvalidRequest(
2167            "You don't have access to this deployment".to_string(),
2168        ));
2169    }
2170    Ok(deployment)
2171}
2172
2173/// Get current recorder enabled state. Proxies GET /api/recorder/status so
2174/// the UI's Captures tab can render the toggle as a real read+write — the
2175/// Enable/Disable buttons used to be fire-and-forget with no read side.
2176pub async fn get_recorder_status(
2177    State(state): State<AppState>,
2178    AuthUser(user_id): AuthUser,
2179    headers: HeaderMap,
2180    Path(deployment_id): Path<Uuid>,
2181) -> ApiResult<axum::http::Response<axum::body::Body>> {
2182    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2183    proxy_to_deployment_recorder(&deployment, "/api/recorder/status").await
2184}
2185
2186/// Enable recording on the deployment. Proxies POST /api/recorder/enable.
2187pub async fn enable_recorder(
2188    State(state): State<AppState>,
2189    AuthUser(user_id): AuthUser,
2190    headers: HeaderMap,
2191    Path(deployment_id): Path<Uuid>,
2192) -> ApiResult<axum::http::Response<axum::body::Body>> {
2193    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2194    proxy_post_to_deployment_recorder(&deployment, "/api/recorder/enable").await
2195}
2196
2197/// Disable recording on the deployment. Proxies POST /api/recorder/disable.
2198pub async fn disable_recorder(
2199    State(state): State<AppState>,
2200    AuthUser(user_id): AuthUser,
2201    headers: HeaderMap,
2202    Path(deployment_id): Path<Uuid>,
2203) -> ApiResult<axum::http::Response<axum::body::Body>> {
2204    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2205    proxy_post_to_deployment_recorder(&deployment, "/api/recorder/disable").await
2206}
2207
2208/// Clear all captures on the deployment. Proxies DELETE /api/recorder/clear.
2209/// Note: the deployment's clear endpoint is DELETE; we POST through to it
2210/// here because typing the verb at the cloud layer doesn't change much
2211/// and POST is friendlier for browser fetch without preflight.
2212pub async fn clear_recorder(
2213    State(state): State<AppState>,
2214    AuthUser(user_id): AuthUser,
2215    headers: HeaderMap,
2216    Path(deployment_id): Path<Uuid>,
2217) -> ApiResult<axum::http::Response<axum::body::Body>> {
2218    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2219    // Build a DELETE on the wire since that's what the recorder defines.
2220    let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
2221    let Some(base) = base else {
2222        return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
2223    };
2224    let url = format!("{}/api/recorder/clear", base.trim_end_matches('/'));
2225    let client = reqwest::Client::builder()
2226        .timeout(std::time::Duration::from_secs(10))
2227        .build()
2228        .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
2229    let resp =
2230        client.delete(&url).send().await.map_err(|e| {
2231            ApiError::Internal(anyhow::anyhow!("Recorder clear proxy failed: {}", e))
2232        })?;
2233    let status = resp.status();
2234    let headers_resp = resp.headers().clone();
2235    let body = resp.bytes().await.map_err(|e| {
2236        ApiError::Internal(anyhow::anyhow!("Recorder clear read body failed: {}", e))
2237    })?;
2238    let mut builder = axum::http::Response::builder().status(status);
2239    if let Some(content_type) = headers_resp.get(axum::http::header::CONTENT_TYPE) {
2240        builder = builder.header(axum::http::header::CONTENT_TYPE, content_type);
2241    }
2242    builder.body(axum::body::Body::from(body)).map_err(|e| {
2243        ApiError::Internal(anyhow::anyhow!("Recorder clear response build failed: {}", e))
2244    })
2245}
2246
2247/// Replay a captured request against the deployment. The recorder
2248/// records the request envelope; replay re-executes it and returns the
2249/// fresh response — useful for "did the bug we captured get fixed yet"
2250/// or "check whether a chaos rule still triggers."
2251pub async fn replay_recorder_capture(
2252    State(state): State<AppState>,
2253    AuthUser(user_id): AuthUser,
2254    headers: HeaderMap,
2255    Path((deployment_id, capture_id)): Path<(Uuid, String)>,
2256) -> ApiResult<axum::http::Response<axum::body::Body>> {
2257    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2258    if capture_id.contains('/') || capture_id.contains('?') || capture_id.contains('#') {
2259        return Err(ApiError::InvalidRequest("Invalid capture id".to_string()));
2260    }
2261    let path = format!("/api/recorder/replay/{}", urlencoding::encode(&capture_id));
2262    proxy_post_to_deployment_recorder(&deployment, &path).await
2263}
2264
2265/// Export the deployment's recorder captures as HAR. Proxies the
2266/// recorder's existing `/api/recorder/export/har` endpoint and forwards
2267/// the response unchanged. The browser handles the download via a blob
2268/// URL on the UI side.
2269pub async fn export_recorder_captures_har(
2270    State(state): State<AppState>,
2271    AuthUser(user_id): AuthUser,
2272    headers: HeaderMap,
2273    Path(deployment_id): Path<Uuid>,
2274) -> ApiResult<axum::http::Response<axum::body::Body>> {
2275    let pool = state.db.pool();
2276
2277    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2278        .await
2279        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2280
2281    let deployment = HostedMock::find_by_id(pool, deployment_id)
2282        .await
2283        .map_err(ApiError::Database)?
2284        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2285
2286    if deployment.org_id != org_ctx.org_id {
2287        return Err(ApiError::InvalidRequest(
2288            "You don't have access to this deployment".to_string(),
2289        ));
2290    }
2291
2292    let _ = state;
2293    proxy_to_deployment_recorder(&deployment, "/api/recorder/export/har").await
2294}
2295
2296/// Export the deployment's recorder captures as JSONL — one
2297/// `RecordedExchange` per line, all protocols. Companion to the HAR
2298/// export. JSONL is the format the local `mockforge-cli replay` reads,
2299/// so this is the round-trip path: capture in cloud → download → replay
2300/// locally.
2301pub async fn export_recorder_captures_jsonl(
2302    State(state): State<AppState>,
2303    AuthUser(user_id): AuthUser,
2304    headers: HeaderMap,
2305    Path(deployment_id): Path<Uuid>,
2306) -> ApiResult<axum::http::Response<axum::body::Body>> {
2307    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2308    proxy_to_deployment_recorder(&deployment, "/api/recorder/export/jsonl").await
2309}
2310
2311/// Get deployment metrics
2312pub async fn get_deployment_metrics(
2313    State(state): State<AppState>,
2314    AuthUser(user_id): AuthUser,
2315    headers: HeaderMap,
2316    Path(deployment_id): Path<Uuid>,
2317) -> ApiResult<Json<MetricsResponse>> {
2318    let pool = state.db.pool();
2319
2320    // Resolve org context
2321    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2322        .await
2323        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2324
2325    // Get deployment
2326    let deployment = HostedMock::find_by_id(pool, deployment_id)
2327        .await
2328        .map_err(ApiError::Database)?
2329        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2330
2331    // Verify access
2332    if deployment.org_id != org_ctx.org_id {
2333        return Err(ApiError::InvalidRequest(
2334            "You don't have access to this deployment".to_string(),
2335        ));
2336    }
2337
2338    // Prefer live metrics from Fly Managed Prometheus when configured.
2339    // Falls back to the local `deployment_metrics` table when:
2340    //   * Fly Prometheus env vars aren't set (dev / self-hosted), or
2341    //   * the query fails (transient network / auth error).
2342    if let Some(client) = crate::fly_metrics::global() {
2343        let app_name = deployment.fly_app_name();
2344        match client.snapshot_for_app(&app_name).await {
2345            Ok(snap) => {
2346                use chrono::Datelike;
2347                let now = Utc::now().date_naive();
2348                let period_start =
2349                    chrono::NaiveDate::from_ymd_opt(now.year(), now.month(), 1).unwrap_or(now);
2350                return Ok(Json(MetricsResponse {
2351                    requests: snap.requests,
2352                    requests_2xx: snap.requests_2xx,
2353                    requests_4xx: snap.requests_4xx,
2354                    requests_5xx: snap.requests_5xx,
2355                    egress_bytes: snap.egress_bytes,
2356                    avg_response_time_ms: snap.avg_response_time_ms,
2357                    period_start,
2358                }));
2359            }
2360            Err(err) => {
2361                tracing::warn!(
2362                    app_name = %app_name,
2363                    error = %err,
2364                    "Fly Prometheus metrics query failed; falling back to local counters"
2365                );
2366            }
2367        }
2368    }
2369
2370    // Fallback: return the local aggregate counters. Until the in-container
2371    // log shipper lands (#232) this table has no writer and returns zeros.
2372    let metrics = DeploymentMetrics::get_or_create_current(pool, deployment_id)
2373        .await
2374        .map_err(ApiError::Database)?;
2375
2376    Ok(Json(MetricsResponse::from(metrics)))
2377}
2378
2379/// Upload an OpenAPI spec file for use in a hosted mock deployment
2380pub async fn upload_spec(
2381    State(state): State<AppState>,
2382    AuthUser(user_id): AuthUser,
2383    headers: HeaderMap,
2384    mut multipart: Multipart,
2385) -> ApiResult<Json<SpecUploadResponse>> {
2386    // Resolve org context
2387    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2388        .await
2389        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2390
2391    // Check permission
2392    let checker = PermissionChecker::new(&state);
2393    checker
2394        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
2395        .await?;
2396
2397    // Extract file from multipart
2398    let mut file_data: Option<Vec<u8>> = None;
2399    let mut file_name = String::from("spec");
2400
2401    while let Some(field) = multipart
2402        .next_field()
2403        .await
2404        .map_err(|e| ApiError::InvalidRequest(format!("Failed to read multipart field: {}", e)))?
2405    {
2406        if field.name() == Some("file") || field.name() == Some("spec") {
2407            if let Some(name) = field.file_name() {
2408                file_name =
2409                    name.to_string().replace(".yaml", "").replace(".yml", "").replace(".json", "");
2410            }
2411            let data = field.bytes().await.map_err(|e| {
2412                ApiError::InvalidRequest(format!("Failed to read file data: {}", e))
2413            })?;
2414
2415            // Validate it's valid JSON or YAML OpenAPI spec
2416            let content = String::from_utf8(data.to_vec()).map_err(|_| {
2417                ApiError::InvalidRequest("File must be valid UTF-8 text".to_string())
2418            })?;
2419
2420            // Try to parse as JSON first, then YAML
2421            let spec_value: serde_json::Value =
2422                if let Ok(v) = serde_json::from_str::<serde_json::Value>(&content) {
2423                    v
2424                } else if let Ok(v) = serde_yaml::from_str::<serde_json::Value>(&content) {
2425                    v
2426                } else {
2427                    return Err(ApiError::InvalidRequest(
2428                        "File must be a valid JSON or YAML OpenAPI specification".to_string(),
2429                    ));
2430                };
2431
2432            // Basic OpenAPI validation - check for required fields
2433            if spec_value.get("openapi").is_none() && spec_value.get("swagger").is_none() {
2434                return Err(ApiError::InvalidRequest(
2435                    "File must contain an 'openapi' or 'swagger' field".to_string(),
2436                ));
2437            }
2438
2439            // Always store as JSON
2440            let json_data = serde_json::to_vec_pretty(&spec_value).map_err(|e| {
2441                ApiError::Internal(anyhow::anyhow!("Failed to serialize spec: {}", e))
2442            })?;
2443
2444            file_data = Some(json_data);
2445        }
2446    }
2447
2448    let data = file_data.ok_or_else(|| {
2449        ApiError::InvalidRequest("No 'file' or 'spec' field in upload".to_string())
2450    })?;
2451
2452    // Upload to storage
2453    let url = state
2454        .storage
2455        .upload_spec(&org_ctx.org_id.to_string(), &file_name, data)
2456        .await
2457        .map_err(|e| ApiError::Internal(anyhow::anyhow!("Failed to upload spec: {}", e)))?;
2458
2459    Ok(Json(SpecUploadResponse { url }))
2460}
2461
2462#[derive(Debug, Serialize)]
2463pub struct SpecUploadResponse {
2464    pub url: String,
2465}
2466
2467// Request/Response types
2468
2469#[derive(Debug, Deserialize)]
2470pub struct CreateDeploymentRequest {
2471    pub name: String,
2472    pub slug: Option<String>,
2473    pub description: Option<String>,
2474    pub project_id: Option<Uuid>,
2475    pub config_json: serde_json::Value,
2476    pub openapi_spec_url: Option<String>,
2477    pub region: Option<String>,
2478    /// Protocols to expose on the deployment. HTTP is implicit and always
2479    /// included. Items beyond Free-tier (gRPC, brokers) require a higher
2480    /// plan and are rejected at create time if the org isn't entitled.
2481    /// Persisted into `config_json["enabled_protocols"]`.
2482    #[serde(default)]
2483    pub enabled_protocols: Option<Vec<crate::models::Protocol>>,
2484    /// Optional upstream URL the deployment proxies to when the reality
2485    /// slider is > 0 (#222). When unset, the slider is a no-op and
2486    /// responses always come from the mock. Persisted into
2487    /// `config_json["upstream_url"]`.
2488    #[serde(default)]
2489    pub upstream_url: Option<String>,
2490}
2491
2492#[derive(Debug, Deserialize)]
2493pub struct UpdateStatusRequest {
2494    pub status: String,
2495    pub error_message: Option<String>,
2496    pub deployment_url: Option<String>,
2497    pub internal_url: Option<String>,
2498}
2499
2500#[derive(Debug, Serialize)]
2501pub struct DeploymentResponse {
2502    pub id: Uuid,
2503    pub org_id: Uuid,
2504    pub project_id: Option<Uuid>,
2505    pub name: String,
2506    pub slug: String,
2507    pub description: Option<String>,
2508    pub status: String,
2509    pub deployment_url: Option<String>,
2510    pub openapi_spec_url: Option<String>,
2511    pub region: String,
2512    pub instance_type: String,
2513    pub health_status: String,
2514    pub error_message: Option<String>,
2515    pub enabled_protocols: Vec<crate::models::Protocol>,
2516    /// Upstream URL the deployment proxies to when the reality slider is > 0.
2517    /// Persisted inside `config_json["upstream_url"]`; surfaced here so the
2518    /// UI can display and (eventually) edit it without reparsing config_json.
2519    pub upstream_url: Option<String>,
2520    pub created_at: DateTime<Utc>,
2521    pub updated_at: DateTime<Utc>,
2522}
2523
2524impl From<HostedMock> for DeploymentResponse {
2525    fn from(mock: HostedMock) -> Self {
2526        let status = mock.status().to_string();
2527        let health_status = mock.health_status().to_string();
2528        let enabled_protocols = mock.enabled_protocols();
2529        let upstream_url = mock.upstream_url();
2530        Self {
2531            id: mock.id,
2532            org_id: mock.org_id,
2533            project_id: mock.project_id,
2534            name: mock.name,
2535            slug: mock.slug,
2536            description: mock.description,
2537            status,
2538            deployment_url: mock.deployment_url,
2539            openapi_spec_url: mock.openapi_spec_url,
2540            region: mock.region,
2541            instance_type: mock.instance_type,
2542            health_status,
2543            error_message: mock.error_message,
2544            enabled_protocols,
2545            upstream_url,
2546            created_at: mock.created_at,
2547            updated_at: mock.updated_at,
2548        }
2549    }
2550}
2551
2552#[derive(Debug, Serialize)]
2553pub struct LogResponse {
2554    pub id: Uuid,
2555    pub level: String,
2556    pub message: String,
2557    pub metadata: serde_json::Value,
2558    pub created_at: DateTime<Utc>,
2559}
2560
2561impl From<DeploymentLog> for LogResponse {
2562    fn from(log: DeploymentLog) -> Self {
2563        Self {
2564            id: log.id,
2565            level: log.level,
2566            message: log.message,
2567            metadata: log.metadata_json,
2568            created_at: log.created_at,
2569        }
2570    }
2571}
2572
2573#[derive(Debug, Serialize)]
2574pub struct MetricsResponse {
2575    pub requests: i64,
2576    pub requests_2xx: i64,
2577    pub requests_4xx: i64,
2578    pub requests_5xx: i64,
2579    pub egress_bytes: i64,
2580    pub avg_response_time_ms: i64,
2581    pub period_start: chrono::NaiveDate,
2582}
2583
2584impl From<DeploymentMetrics> for MetricsResponse {
2585    fn from(metrics: DeploymentMetrics) -> Self {
2586        Self {
2587            requests: metrics.requests,
2588            requests_2xx: metrics.requests_2xx,
2589            requests_4xx: metrics.requests_4xx,
2590            requests_5xx: metrics.requests_5xx,
2591            egress_bytes: metrics.egress_bytes,
2592            avg_response_time_ms: metrics.avg_response_time_ms,
2593            period_start: metrics.period_start,
2594        }
2595    }
2596}
2597
2598#[derive(Debug, Deserialize)]
2599pub struct SetDomainRequest {
2600    pub domain: String,
2601}
2602
2603/// Set a custom domain for a deployment.
2604///
2605/// Adds a TLS certificate on the registry server Fly.io app so that
2606/// `<slug>.<domain>` terminates TLS here, then the proxy fallback
2607/// handler forwards traffic to the deployment's internal URL.
2608pub async fn set_domain(
2609    State(state): State<AppState>,
2610    AuthUser(user_id): AuthUser,
2611    headers: HeaderMap,
2612    Path(deployment_id): Path<Uuid>,
2613    Json(request): Json<SetDomainRequest>,
2614) -> ApiResult<Json<serde_json::Value>> {
2615    let pool = state.db.pool();
2616
2617    // Resolve org context
2618    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2619        .await
2620        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2621
2622    // Check permission
2623    let checker = PermissionChecker::new(&state);
2624    checker
2625        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
2626        .await?;
2627
2628    // Get deployment
2629    let deployment = HostedMock::find_by_id(pool, deployment_id)
2630        .await
2631        .map_err(ApiError::Database)?
2632        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2633
2634    // Verify ownership
2635    if deployment.org_id != org_ctx.org_id {
2636        return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
2637    }
2638
2639    let hostname = format!("{}.{}", deployment.slug, request.domain);
2640
2641    // Update deployment URL to use the custom domain. A wildcard TLS cert
2642    // on the registry app covers all subdomains, so no per-deployment
2643    // certificate management is needed. Persist `custom_domain` in
2644    // metadata_json so we have an authoritative read-side for the lifecycle
2645    // endpoints below — `deployment_url` alone is ambiguous (it could be a
2646    // MOCKFORGE_MOCKS_DOMAIN-based default).
2647    let new_url = format!("https://{}", hostname);
2648    sqlx::query(
2649        r#"
2650        UPDATE hosted_mocks
2651        SET deployment_url = $1,
2652            metadata_json = jsonb_set(
2653                COALESCE(metadata_json, '{}'::jsonb),
2654                '{custom_domain}',
2655                to_jsonb($2::text)
2656            ),
2657            updated_at = NOW()
2658        WHERE id = $3
2659        "#,
2660    )
2661    .bind(&new_url)
2662    .bind(&hostname)
2663    .bind(deployment_id)
2664    .execute(pool)
2665    .await
2666    .map_err(|e| ApiError::Internal(anyhow::anyhow!("Failed to update deployment URL: {}", e)))?;
2667
2668    DeploymentLog::create(
2669        pool,
2670        deployment_id,
2671        "info",
2672        &format!("Custom domain set: {}", hostname),
2673        None,
2674    )
2675    .await
2676    .ok();
2677
2678    Ok(Json(serde_json::json!({
2679        "hostname": hostname,
2680        "deployment_url": new_url,
2681    })))
2682}
2683
2684/// Read the currently bound custom domain for a deployment, if any. Returns
2685/// `{ "hostname": null }` when the deployment is on its default URL.
2686pub async fn get_custom_domain(
2687    State(state): State<AppState>,
2688    AuthUser(user_id): AuthUser,
2689    headers: HeaderMap,
2690    Path(deployment_id): Path<Uuid>,
2691) -> ApiResult<Json<serde_json::Value>> {
2692    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2693    let hostname = deployment
2694        .metadata_json
2695        .get("custom_domain")
2696        .and_then(|v| v.as_str())
2697        .map(|s| s.to_string());
2698    Ok(Json(serde_json::json!({
2699        "hostname": hostname,
2700        "deployment_url": deployment.deployment_url,
2701    })))
2702}
2703
2704/// Remove the custom domain mapping. Reverts `deployment_url` to the
2705/// MOCKFORGE_MOCKS_DOMAIN-based default if configured, or the Fly.io
2706/// default `https://<fly_app>.fly.dev` otherwise. The wildcard TLS cert
2707/// on the registry app stays — there is no per-deployment cert to clean
2708/// up because the set path never created one.
2709pub async fn clear_custom_domain(
2710    State(state): State<AppState>,
2711    AuthUser(user_id): AuthUser,
2712    headers: HeaderMap,
2713    Path(deployment_id): Path<Uuid>,
2714) -> ApiResult<Json<serde_json::Value>> {
2715    let pool = state.db.pool();
2716
2717    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2718        .await
2719        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2720
2721    let checker = PermissionChecker::new(&state);
2722    checker
2723        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
2724        .await?;
2725
2726    let deployment = HostedMock::find_by_id(pool, deployment_id)
2727        .await
2728        .map_err(ApiError::Database)?
2729        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2730    if deployment.org_id != org_ctx.org_id {
2731        return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
2732    }
2733
2734    let app_name = deployment.fly_app_name();
2735    let default_url = if let Ok(domain) = std::env::var("MOCKFORGE_MOCKS_DOMAIN") {
2736        format!("https://{}.{}", deployment.slug, domain)
2737    } else {
2738        format!("https://{}.fly.dev", app_name)
2739    };
2740
2741    sqlx::query(
2742        r#"
2743        UPDATE hosted_mocks
2744        SET deployment_url = $1,
2745            metadata_json = COALESCE(metadata_json, '{}'::jsonb) - 'custom_domain',
2746            updated_at = NOW()
2747        WHERE id = $2
2748        "#,
2749    )
2750    .bind(&default_url)
2751    .bind(deployment_id)
2752    .execute(pool)
2753    .await
2754    .map_err(|e| ApiError::Internal(anyhow::anyhow!("Failed to clear custom domain: {}", e)))?;
2755
2756    DeploymentLog::create(pool, deployment_id, "info", "Custom domain removed", None)
2757        .await
2758        .ok();
2759
2760    Ok(Json(serde_json::json!({
2761        "hostname": serde_json::Value::Null,
2762        "deployment_url": default_url,
2763    })))
2764}
2765
2766// ─── Smoke-test trigger (Issue #392) ─────────────────────────────────
2767
2768/// Optional overrides on the smoke run. Both fields fall back to the
2769/// runner's executor defaults when omitted (5s latency budget, GET-only).
2770#[derive(Debug, Default, Deserialize)]
2771#[serde(default, rename_all = "camelCase")]
2772pub struct TriggerSmokeRunRequest {
2773    /// Per-route latency assertion ceiling, in milliseconds.
2774    pub latency_budget_ms: Option<u64>,
2775    /// HTTP methods to probe. Defaults to `["GET"]` at the executor.
2776    /// Currently only GET has been thought through — POST/PUT/PATCH need
2777    /// a body source which v1 doesn't have.
2778    pub methods: Option<Vec<String>>,
2779}
2780
2781/// `POST /api/v1/hosted-mocks/{deployment_id}/smoke-runs`
2782///
2783/// Triggers a smoke test against a hosted-mock deployment. Reuses the
2784/// existing `test_runs` lifecycle with `kind = "smoke"` so smokes share
2785/// the runner pool, concurrency cap, and runner_seconds metering with
2786/// every other run kind. The runner-side `SmokeTestExecutor` (see
2787/// `crates/mockforge-test-runner/src/executors/smoke.rs`) walks the
2788/// deployment's OpenAPI spec, probes each declared route against the
2789/// deployment's public URL, and reports `route_pass` / `route_fail`
2790/// events back via the internal callbacks.
2791///
2792/// Authorization: caller must hold `Permission::HostedMockUpdate` on
2793/// the deployment's org. Cross-org access surfaces as
2794/// "Deployment not found" rather than "forbidden" (matches the
2795/// convention in `delete_deployment`).
2796///
2797/// Failure modes (all 400 InvalidRequest):
2798///   - Deployment not found / not in caller's org.
2799///   - Deployment is not in `running` status — running smoke against a
2800///     deployment that's still provisioning or has crashed gives
2801///     misleading red routes that aren't actually regressions.
2802///   - Deployment has no `deployment_url` (still being provisioned).
2803///   - Deployment has no `openapi_spec_url` (no spec uploaded yet).
2804pub async fn trigger_smoke_run(
2805    State(state): State<AppState>,
2806    AuthUser(user_id): AuthUser,
2807    Path(deployment_id): Path<Uuid>,
2808    headers: HeaderMap,
2809    body: Option<Json<TriggerSmokeRunRequest>>,
2810) -> ApiResult<Json<TestRun>> {
2811    let req = body.map(|Json(r)| r).unwrap_or_default();
2812
2813    // ─── Auth + deployment lookup ────────────────────────────────
2814    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2815        .await
2816        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2817
2818    let checker = PermissionChecker::new(&state);
2819    checker
2820        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
2821        .await?;
2822
2823    let deployment = state
2824        .store
2825        .find_hosted_mock_by_id(deployment_id)
2826        .await?
2827        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2828    if deployment.org_id != org_ctx.org_id {
2829        // Hide existence from non-members of the deployment's org.
2830        return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
2831    }
2832    if deployment.status != "active" {
2833        // `active` is the canonical "deployment is up + serving" status
2834        // (see `DeploymentStatus::Active` in mockforge-registry-core).
2835        // Anything else (`pending`, `deploying`, `stopped`, `failed`,
2836        // `deleting`) means we'd be probing a deployment that isn't
2837        // actually serving traffic — every route would surface as a
2838        // misleading red.
2839        return Err(ApiError::InvalidRequest(format!(
2840            "Deployment is in '{}' status; smoke runs require 'active'",
2841            deployment.status,
2842        )));
2843    }
2844
2845    // ─── Plan-limit gate (shared with every other test_run kind) ──
2846    let limits = crate::handlers::usage::effective_limits(&state, &org_ctx.org).await?;
2847    let max_concurrent = limits.get("max_concurrent_runs").and_then(|v| v.as_i64()).unwrap_or(0);
2848    if max_concurrent == 0 {
2849        return Err(ApiError::ResourceLimitExceeded(
2850            "Test execution is not enabled on this plan".into(),
2851        ));
2852    }
2853    if max_concurrent > 0 {
2854        let inflight = TestRun::count_inflight(state.db.pool(), org_ctx.org_id)
2855            .await
2856            .map_err(ApiError::Database)?;
2857        if inflight.total() >= max_concurrent {
2858            return Err(ApiError::ResourceLimitExceeded(format!(
2859                "Concurrent run limit reached ({}/{}).",
2860                inflight.total(),
2861                max_concurrent,
2862            )));
2863        }
2864    }
2865
2866    // ─── Build the runner payload ────────────────────────────────
2867    let payload = build_smoke_payload(&deployment, &req)?;
2868
2869    // ─── Enqueue test_runs row ───────────────────────────────────
2870    let run = TestRun::enqueue(
2871        state.db.pool(),
2872        EnqueueTestRun {
2873            suite_id: deployment.id,
2874            org_id: org_ctx.org_id,
2875            kind: "smoke",
2876            triggered_by: "manual",
2877            triggered_by_user: Some(user_id),
2878            git_ref: None,
2879            git_sha: None,
2880        },
2881    )
2882    .await
2883    .map_err(ApiError::Database)?;
2884
2885    // ─── Push onto the Redis queue for the runner ────────────────
2886    if let Err(e) = crate::run_queue::enqueue(
2887        state.redis.as_ref(),
2888        crate::run_queue::EnqueuedJob {
2889            run_id: run.id,
2890            org_id: run.org_id,
2891            source_id: deployment.id,
2892            kind: "smoke",
2893            payload,
2894        },
2895    )
2896    .await
2897    {
2898        // Match the chaos handler's behaviour: log the failure but still
2899        // return the queued row. The runner will pick it up when the
2900        // queue is healthy again, and the test_runs status reflects that
2901        // it never left 'queued' in the meantime.
2902        tracing::error!(run_id = %run.id, error = %e, "failed to enqueue smoke run");
2903    }
2904
2905    Ok(Json(run))
2906}
2907
2908/// Build the JSON payload the runner's `SmokeTestExecutor` consumes.
2909/// Pre-flight validates that the deployment has the URLs the executor
2910/// needs (base + spec) so a "missing field" doesn't surface as a
2911/// runner-side `errored` run after queueing.
2912fn build_smoke_payload(
2913    deployment: &HostedMock,
2914    req: &TriggerSmokeRunRequest,
2915) -> ApiResult<serde_json::Value> {
2916    let base_url =
2917        deployment.deployment_url.as_deref().filter(|s| !s.is_empty()).ok_or_else(|| {
2918            ApiError::InvalidRequest(
2919                "Deployment has no public URL — wait for the deploy to finish before running smoke"
2920                    .to_string(),
2921            )
2922        })?;
2923    let spec_url =
2924        deployment
2925            .openapi_spec_url
2926            .as_deref()
2927            .filter(|s| !s.is_empty())
2928            .ok_or_else(|| {
2929                ApiError::InvalidRequest(
2930                    "Deployment has no OpenAPI spec — upload one before running smoke".to_string(),
2931                )
2932            })?;
2933
2934    let mut payload = serde_json::json!({
2935        "deployment_id": deployment.id,
2936        "base_url": base_url,
2937        "openapi_spec_url": spec_url,
2938    });
2939    let obj = payload
2940        .as_object_mut()
2941        .expect("payload was constructed as an object on the line above");
2942
2943    if let Some(budget) = req.latency_budget_ms {
2944        obj.insert("latency_budget_ms".into(), budget.into());
2945    }
2946    if let Some(methods) = req.methods.as_ref() {
2947        // `to_value` on a Vec<String> is infallible in practice, but
2948        // bail with a clean error rather than panic if it ever isn't.
2949        let v = serde_json::to_value(methods)
2950            .map_err(|e| ApiError::InvalidRequest(format!("invalid methods array: {e}")))?;
2951        obj.insert("methods".into(), v);
2952    }
2953
2954    Ok(payload)
2955}
2956
2957#[cfg(test)]
2958mod smoke_trigger_tests {
2959    use super::*;
2960
2961    fn deployment_with(
2962        status: &str,
2963        deployment_url: Option<&str>,
2964        spec_url: Option<&str>,
2965    ) -> HostedMock {
2966        // Construct the full struct rather than `..Default::default()`:
2967        // HostedMock doesn't impl Default, and listing every field
2968        // explicitly means a future schema column addition triggers a
2969        // compile error here so the test gets a chance to opt in.
2970        HostedMock {
2971            id: Uuid::new_v4(),
2972            org_id: Uuid::new_v4(),
2973            project_id: None,
2974            name: "test".to_string(),
2975            slug: "test".to_string(),
2976            description: None,
2977            config_json: serde_json::json!({}),
2978            openapi_spec_url: spec_url.map(String::from),
2979            status: status.to_string(),
2980            deployment_url: deployment_url.map(String::from),
2981            internal_url: None,
2982            region: "iad".to_string(),
2983            instance_type: "shared-cpu-1x".to_string(),
2984            health_check_url: None,
2985            last_health_check: None,
2986            health_status: "unknown".to_string(),
2987            error_message: None,
2988            metadata_json: serde_json::json!({}),
2989            created_at: Utc::now(),
2990            updated_at: Utc::now(),
2991            deleted_at: None,
2992        }
2993    }
2994
2995    #[test]
2996    fn build_smoke_payload_uses_deployment_urls() {
2997        let dep = deployment_with(
2998            "running",
2999            Some("https://my-mock.fly.dev"),
3000            Some("https://specs.example.com/abc.json"),
3001        );
3002        let req = TriggerSmokeRunRequest::default();
3003        let payload = build_smoke_payload(&dep, &req).unwrap();
3004        assert_eq!(payload["base_url"], "https://my-mock.fly.dev");
3005        assert_eq!(payload["openapi_spec_url"], "https://specs.example.com/abc.json");
3006        assert_eq!(payload["deployment_id"], serde_json::json!(dep.id));
3007        assert!(payload.get("latency_budget_ms").is_none());
3008        assert!(payload.get("methods").is_none());
3009    }
3010
3011    #[test]
3012    fn build_smoke_payload_passes_overrides() {
3013        let dep = deployment_with("running", Some("https://x"), Some("https://y"));
3014        let req = TriggerSmokeRunRequest {
3015            latency_budget_ms: Some(2000),
3016            methods: Some(vec!["GET".into(), "HEAD".into()]),
3017        };
3018        let payload = build_smoke_payload(&dep, &req).unwrap();
3019        assert_eq!(payload["latency_budget_ms"], 2000);
3020        assert_eq!(payload["methods"], serde_json::json!(["GET", "HEAD"]));
3021    }
3022
3023    #[test]
3024    fn build_smoke_payload_rejects_missing_deployment_url() {
3025        let dep = deployment_with("running", None, Some("https://y"));
3026        let err = build_smoke_payload(&dep, &TriggerSmokeRunRequest::default()).unwrap_err();
3027        match err {
3028            ApiError::InvalidRequest(msg) => assert!(msg.contains("public URL")),
3029            other => panic!("expected InvalidRequest, got {other:?}"),
3030        }
3031    }
3032
3033    #[test]
3034    fn build_smoke_payload_rejects_empty_deployment_url() {
3035        let dep = deployment_with("running", Some(""), Some("https://y"));
3036        assert!(matches!(
3037            build_smoke_payload(&dep, &TriggerSmokeRunRequest::default()),
3038            Err(ApiError::InvalidRequest(_))
3039        ));
3040    }
3041
3042    #[test]
3043    fn build_smoke_payload_rejects_missing_spec_url() {
3044        let dep = deployment_with("running", Some("https://x"), None);
3045        let err = build_smoke_payload(&dep, &TriggerSmokeRunRequest::default()).unwrap_err();
3046        match err {
3047            ApiError::InvalidRequest(msg) => assert!(msg.contains("OpenAPI spec")),
3048            other => panic!("expected InvalidRequest, got {other:?}"),
3049        }
3050    }
3051}