Skip to main content

mockforge_registry_server/handlers/
hosted_mocks.rs

1//! Hosted Mocks deployment handlers
2//!
3//! Provides endpoints for deploying, managing, and monitoring cloud-hosted mock services
4
5use axum::{
6    extract::{Multipart, Path, Query, State},
7    http::HeaderMap,
8    response::sse::{Event, KeepAlive, Sse},
9    Json,
10};
11use chrono::{DateTime, Utc};
12use futures_util::stream::Stream;
13use serde::{Deserialize, Serialize};
14use std::convert::Infallible;
15use uuid::Uuid;
16
17use crate::{
18    deployment::flyio::FlyioClient,
19    error::{ApiError, ApiResult},
20    fly_logs::LogEntry,
21    middleware::{
22        permission_check::PermissionChecker, permissions::Permission, resolve_org_context, AuthUser,
23    },
24    models::{
25        feature_usage::FeatureType, AuditEventType, DeploymentLog, DeploymentMetrics,
26        DeploymentStatus, HostedMock, Subscription, SubscriptionStatus, TestRun,
27    },
28    AppState,
29};
30use mockforge_registry_core::models::test_run::EnqueueTestRun;
31use tracing::warn;
32
33/// Create a new hosted mock deployment
34pub async fn create_deployment(
35    State(state): State<AppState>,
36    AuthUser(user_id): AuthUser,
37    headers: HeaderMap,
38    Json(request): Json<CreateDeploymentRequest>,
39) -> ApiResult<Json<DeploymentResponse>> {
40    let pool = state.db.pool();
41
42    // Resolve org context
43    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
44        .await
45        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
46
47    // Check permission
48    let checker = PermissionChecker::new(&state);
49    checker
50        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
51        .await?;
52
53    // Block deployment for past_due subscriptions whose payment has been
54    // outstanding for >24h — per #449 acceptance criterion. The earlier
55    // implementation (PR #479) blocked immediately on the first
56    // `invoice.payment_failed` webhook, which was stricter than the spec
57    // called for and made customers feel cut off the moment Stripe's first
58    // retry failed (often a transient card-network blip). The 24h grace
59    // window matches what most SaaS providers do during Stripe's
60    // smart-retry sequence.
61    //
62    // We use `subscription.updated_at` as a proxy for "when the status
63    // flipped to past_due" because every state-change webhook
64    // (`customer.subscription.updated`, `invoice.payment_failed`,
65    // `invoice.payment_succeeded`) calls `Subscription::upsert_from_stripe`
66    // which touches `updated_at`. There's a small accuracy gap if an
67    // unrelated write happens during the grace window, but for the 24h
68    // threshold it's good enough; a future PR can add an explicit
69    // `past_due_since` column if billing wants finer control.
70    if let Some(subscription) = Subscription::find_by_org(pool, org_ctx.org_id).await? {
71        if subscription.status() == SubscriptionStatus::PastDue {
72            const PAST_DUE_GRACE_SECONDS: i64 = 24 * 60 * 60;
73            let elapsed = (Utc::now() - subscription.updated_at).num_seconds();
74            if elapsed > PAST_DUE_GRACE_SECONDS {
75                // 402 (not 400) — billing-state failure is distinct from a
76                // malformed request. Matches #449 acceptance criterion 8.
77                return Err(ApiError::PaymentRequired(
78                    "Subscription has been past due for over 24 hours. Please update your payment method in the billing portal before deploying new mocks.".to_string(),
79                ));
80            }
81            tracing::info!(
82                org_id = %org_ctx.org_id,
83                past_due_seconds = elapsed,
84                "past_due subscription within 24h grace; allowing deploy"
85            );
86        }
87    }
88
89    // Check plan limits
90    let limits = &org_ctx.org.limits_json;
91    let max_hosted_mocks = limits.get("max_hosted_mocks").and_then(|v| v.as_i64()).unwrap_or(0);
92
93    if max_hosted_mocks >= 0 {
94        // Count existing active deployments
95        let existing = state.store.list_hosted_mocks_by_org(org_ctx.org_id).await?;
96
97        let active_count = existing
98            .iter()
99            .filter(|m| {
100                matches!(m.status(), DeploymentStatus::Active | DeploymentStatus::Deploying)
101            })
102            .count();
103
104        if active_count as i64 >= max_hosted_mocks {
105            return Err(ApiError::InvalidRequest(format!(
106                "Hosted mocks limit exceeded. Your plan allows {} active deployments. Upgrade to deploy more.",
107                max_hosted_mocks
108            )));
109        }
110    }
111
112    // Validate slug format
113    let generated_slug;
114    let slug = match request.slug.as_deref() {
115        Some(s) => s,
116        None => {
117            generated_slug = request
118                .name
119                .to_lowercase()
120                .chars()
121                .map(|c| {
122                    if c.is_alphanumeric() || c == '-' {
123                        c
124                    } else {
125                        '-'
126                    }
127                })
128                .collect::<String>()
129                .trim_matches('-')
130                .replace("--", "-");
131            &generated_slug
132        }
133    };
134
135    if !slug.chars().all(|c| c.is_alphanumeric() || c == '-') {
136        return Err(ApiError::InvalidRequest(
137            "Slug must contain only alphanumeric characters and hyphens".to_string(),
138        ));
139    }
140
141    // Check if slug is already taken
142    if state.store.find_hosted_mock_by_slug(org_ctx.org_id, slug).await?.is_some() {
143        return Err(ApiError::InvalidRequest(format!(
144            "A deployment with slug '{}' already exists",
145            slug
146        )));
147    }
148
149    // Resolve and plan-gate the protocol set. HTTP is always present and is
150    // free; gRPC requires Pro; brokers (SMTP/MQTT/Kafka/AMQP/TCP) require
151    // Team. Reject with an explicit error so the UI can surface "upgrade to
152    // enable X" rather than a generic 400.
153    let mut enabled_protocols = request.enabled_protocols.clone().unwrap_or_default();
154    if !enabled_protocols.contains(&crate::models::Protocol::Http) {
155        enabled_protocols.insert(0, crate::models::Protocol::Http);
156    }
157    let plan = org_ctx.org.plan.clone();
158    if !crate::models::protocols_allowed_on_plan(&enabled_protocols, &plan) {
159        let blocked: Vec<String> = enabled_protocols
160            .iter()
161            .filter(|p| !crate::models::protocols_allowed_on_plan(&[**p], &plan))
162            .map(|p| format!("{:?}", p))
163            .collect();
164        return Err(ApiError::InvalidRequest(format!(
165            "These protocols require a higher plan than '{}': {}",
166            plan,
167            blocked.join(", ")
168        )));
169    }
170
171    // Persist enabled protocols and upstream_url into the deployment's
172    // config_json so the orchestrator can read them when building the
173    // Fly machine. Additive merge preserves any keys the caller supplied.
174    let mut config_json = request.config_json.clone();
175    if !config_json.is_object() {
176        config_json = serde_json::json!({});
177    }
178    if let Some(obj) = config_json.as_object_mut() {
179        obj.insert(
180            "enabled_protocols".to_string(),
181            serde_json::to_value(&enabled_protocols).unwrap_or(serde_json::Value::Null),
182        );
183        if let Some(upstream) = request.upstream_url.as_ref() {
184            let trimmed = upstream.trim();
185            if !trimmed.is_empty() {
186                obj.insert(
187                    "upstream_url".to_string(),
188                    serde_json::Value::String(trimmed.to_string()),
189                );
190            }
191        }
192    }
193
194    // Create deployment record
195    let deployment = state
196        .store
197        .create_hosted_mock(
198            org_ctx.org_id,
199            request.project_id,
200            &request.name,
201            slug,
202            request.description.as_deref(),
203            config_json,
204            request.openapi_spec_url.as_deref(),
205            request.region.as_deref(),
206        )
207        .await?;
208
209    // Log deployment creation
210    DeploymentLog::create(
211        pool,
212        deployment.id,
213        "info",
214        "Deployment created",
215        Some(serde_json::json!({
216            "name": request.name,
217            "slug": slug,
218        })),
219    )
220    .await
221    .map_err(ApiError::Database)?;
222
223    // Mark as pending - the deployment orchestrator will pick it up and deploy it
224    // The orchestrator polls for pending/deploying deployments every 10 seconds
225    state
226        .store
227        .update_hosted_mock_status(deployment.id, DeploymentStatus::Pending, None)
228        .await?;
229
230    // Track feature usage
231    state
232        .store
233        .record_feature_usage(
234            org_ctx.org_id,
235            Some(user_id),
236            FeatureType::HostedMockDeploy,
237            Some(serde_json::json!({
238                "deployment_id": deployment.id,
239                "name": request.name,
240                "slug": slug,
241            })),
242        )
243        .await;
244
245    // Record audit log
246    let ip_address = headers
247        .get("X-Forwarded-For")
248        .or_else(|| headers.get("X-Real-IP"))
249        .and_then(|h| h.to_str().ok())
250        .map(|s| s.split(',').next().unwrap_or(s).trim());
251    let user_agent = headers.get("User-Agent").and_then(|h| h.to_str().ok());
252
253    state
254        .store
255        .record_audit_event(
256            org_ctx.org_id,
257            Some(user_id),
258            AuditEventType::DeploymentCreated,
259            format!("Hosted mock deployment '{}' created", request.name),
260            Some(serde_json::json!({
261                "deployment_id": deployment.id,
262                "name": request.name,
263                "slug": slug,
264                "project_id": request.project_id,
265            })),
266            ip_address,
267            user_agent,
268        )
269        .await;
270
271    // Return deployment info
272    let deployment = state.store.find_hosted_mock_by_id(deployment.id).await?.ok_or_else(|| {
273        ApiError::Internal(anyhow::anyhow!("Failed to retrieve created deployment"))
274    })?;
275
276    Ok(Json(DeploymentResponse::from(deployment)))
277}
278
279/// List all deployments for the organization
280pub async fn list_deployments(
281    State(state): State<AppState>,
282    AuthUser(user_id): AuthUser,
283    headers: HeaderMap,
284) -> ApiResult<Json<Vec<DeploymentResponse>>> {
285    // Resolve org context
286    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
287        .await
288        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
289
290    // Get all deployments
291    let deployments = state.store.list_hosted_mocks_by_org(org_ctx.org_id).await?;
292
293    let responses: Vec<DeploymentResponse> =
294        deployments.into_iter().map(DeploymentResponse::from).collect();
295
296    Ok(Json(responses))
297}
298
299/// Get deployment details
300pub async fn get_deployment(
301    State(state): State<AppState>,
302    AuthUser(user_id): AuthUser,
303    headers: HeaderMap,
304    Path(deployment_id): Path<Uuid>,
305) -> ApiResult<Json<DeploymentResponse>> {
306    // Resolve org context
307    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
308        .await
309        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
310
311    // Get deployment
312    let deployment = state
313        .store
314        .find_hosted_mock_by_id(deployment_id)
315        .await?
316        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
317
318    // Verify access
319    if deployment.org_id != org_ctx.org_id {
320        return Err(ApiError::InvalidRequest(
321            "You don't have access to this deployment".to_string(),
322        ));
323    }
324
325    Ok(Json(DeploymentResponse::from(deployment)))
326}
327
328/// Update deployment status (internal/admin use)
329pub async fn update_deployment_status(
330    State(state): State<AppState>,
331    AuthUser(user_id): AuthUser,
332    headers: HeaderMap,
333    Path(deployment_id): Path<Uuid>,
334    Json(request): Json<UpdateStatusRequest>,
335) -> ApiResult<Json<DeploymentResponse>> {
336    // Resolve org context
337    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
338        .await
339        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
340
341    // Check permission
342    let checker = PermissionChecker::new(&state);
343    checker
344        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
345        .await?;
346
347    // Get deployment
348    let deployment = state
349        .store
350        .find_hosted_mock_by_id(deployment_id)
351        .await?
352        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
353
354    // Verify access
355    if deployment.org_id != org_ctx.org_id {
356        return Err(ApiError::InvalidRequest(
357            "You don't have access to this deployment".to_string(),
358        ));
359    }
360
361    // Update status
362    let status = DeploymentStatus::from_str(&request.status)
363        .ok_or_else(|| ApiError::InvalidRequest("Invalid status".to_string()))?;
364
365    state
366        .store
367        .update_hosted_mock_status(deployment_id, status, request.error_message.as_deref())
368        .await?;
369
370    // Update URLs if provided
371    if request.deployment_url.is_some() || request.internal_url.is_some() {
372        state
373            .store
374            .update_hosted_mock_urls(
375                deployment_id,
376                request.deployment_url.as_deref(),
377                request.internal_url.as_deref(),
378            )
379            .await?;
380    }
381
382    // Get updated deployment
383    let deployment = state.store.find_hosted_mock_by_id(deployment_id).await?.ok_or_else(|| {
384        ApiError::Internal(anyhow::anyhow!("Failed to retrieve updated deployment"))
385    })?;
386
387    // Send deployment status notification email (non-blocking)
388    if let Ok(Some(org)) = state.store.find_organization_by_id(deployment.org_id).await {
389        if let Ok(Some(owner)) = state.store.find_user_by_id(org.owner_id).await {
390            let status_str = format!("{:?}", deployment.status()).to_lowercase();
391            let email_msg = crate::email::EmailService::generate_deployment_status_email(
392                &owner.username,
393                &owner.email,
394                &deployment.name,
395                &status_str,
396                deployment.deployment_url.as_deref(),
397                request.error_message.as_deref(),
398            );
399
400            tokio::spawn(async move {
401                match crate::email::EmailService::from_env() {
402                    Ok(email_service) => {
403                        if let Err(e) = email_service.send(email_msg).await {
404                            tracing::warn!("Failed to send deployment status email: {}", e);
405                        }
406                    }
407                    Err(e) => {
408                        tracing::warn!("Failed to create email service: {}", e);
409                    }
410                }
411            });
412        }
413    }
414
415    Ok(Json(DeploymentResponse::from(deployment)))
416}
417
418/// Delete a deployment
419pub async fn delete_deployment(
420    State(state): State<AppState>,
421    AuthUser(user_id): AuthUser,
422    headers: HeaderMap,
423    Path(deployment_id): Path<Uuid>,
424) -> ApiResult<Json<serde_json::Value>> {
425    let pool = state.db.pool();
426
427    // Resolve org context
428    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
429        .await
430        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
431
432    // Check permission
433    let checker = PermissionChecker::new(&state);
434    checker
435        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockDelete)
436        .await?;
437
438    // Get deployment
439    let deployment = state
440        .store
441        .find_hosted_mock_by_id(deployment_id)
442        .await?
443        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
444
445    // Verify access
446    if deployment.org_id != org_ctx.org_id {
447        return Err(ApiError::InvalidRequest(
448            "You don't have access to this deployment".to_string(),
449        ));
450    }
451
452    // Record audit log before deletion
453    let ip_address = headers
454        .get("X-Forwarded-For")
455        .or_else(|| headers.get("X-Real-IP"))
456        .and_then(|h| h.to_str().ok())
457        .map(|s| s.split(',').next().unwrap_or(s).trim());
458    let user_agent = headers.get("User-Agent").and_then(|h| h.to_str().ok());
459
460    state
461        .store
462        .record_audit_event(
463            org_ctx.org_id,
464            Some(user_id),
465            AuditEventType::DeploymentDeleted,
466            format!("Hosted mock deployment '{}' deleted", deployment.name),
467            Some(serde_json::json!({
468                "deployment_id": deployment.id,
469                "name": deployment.name,
470                "slug": deployment.slug,
471            })),
472            ip_address,
473            user_agent,
474        )
475        .await;
476
477    // Update status to deleting before cleanup
478    state
479        .store
480        .update_hosted_mock_status(deployment_id, DeploymentStatus::Deleting, None)
481        .await?;
482
483    // Trigger actual deletion (stop service, cleanup resources, etc.)
484    // Try to delete from Fly.io if configured
485    if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
486        let flyio_client = FlyioClient::new(flyio_token);
487
488        // Generate app name (same as in orchestrator)
489        let app_name = format!(
490            "mockforge-{}-{}",
491            deployment
492                .org_id
493                .to_string()
494                .replace("-", "")
495                .chars()
496                .take(8)
497                .collect::<String>(),
498            deployment.slug
499        );
500
501        // Try to get machine_id from metadata
502        let machine_id = deployment.metadata_json.get("flyio_machine_id").and_then(|v| v.as_str());
503
504        if let Some(machine_id) = machine_id {
505            // Delete the specific machine
506            match flyio_client.delete_machine(&app_name, machine_id).await {
507                Ok(_) => {
508                    DeploymentLog::create(
509                        pool,
510                        deployment_id,
511                        "info",
512                        &format!("Deleted Fly.io machine: {}", machine_id),
513                        None,
514                    )
515                    .await
516                    .ok();
517                }
518                Err(e) => {
519                    warn!("Failed to delete Fly.io machine {}: {}", machine_id, e);
520                    DeploymentLog::create(
521                        pool,
522                        deployment_id,
523                        "warning",
524                        &format!("Failed to delete Fly.io machine: {}", e),
525                        None,
526                    )
527                    .await
528                    .ok();
529                    // Continue with app deletion and database deletion
530                }
531            }
532        } else {
533            // Machine ID not found in metadata, try to list and delete all machines
534            warn!(
535                "Machine ID not found in metadata for deployment {}, attempting to list machines",
536                deployment_id
537            );
538            match flyio_client.list_machines(&app_name).await {
539                Ok(machines) => {
540                    for machine in machines {
541                        if let Err(e) = flyio_client.delete_machine(&app_name, &machine.id).await {
542                            warn!("Failed to delete Fly.io machine {}: {}", machine.id, e);
543                        } else {
544                            DeploymentLog::create(
545                                pool,
546                                deployment_id,
547                                "info",
548                                &format!("Deleted Fly.io machine: {}", machine.id),
549                                None,
550                            )
551                            .await
552                            .ok();
553                        }
554                    }
555                }
556                Err(e) => {
557                    warn!("Failed to list Fly.io machines for app {}: {}", app_name, e);
558                    // Continue with app deletion and database deletion
559                }
560            }
561        }
562
563        // Delete the Fly.io app itself to avoid empty apps piling up
564        match flyio_client.delete_app(&app_name).await {
565            Ok(_) => {
566                DeploymentLog::create(
567                    pool,
568                    deployment_id,
569                    "info",
570                    &format!("Deleted Fly.io app: {}", app_name),
571                    None,
572                )
573                .await
574                .ok();
575            }
576            Err(e) => {
577                warn!("Failed to delete Fly.io app {}: {}", app_name, e);
578            }
579        }
580    }
581
582    // Soft delete from database
583    state.store.delete_hosted_mock(deployment_id).await?;
584
585    DeploymentLog::create(pool, deployment_id, "info", "Deployment deleted successfully", None)
586        .await
587        .ok(); // Log but don't fail on log error
588
589    Ok(Json(serde_json::json!({
590        "success": true,
591        "message": "Deployment deleted"
592    })))
593}
594
595/// Request body for redeployment (all fields optional)
596#[derive(Debug, Deserialize, Default)]
597pub struct RedeployRequest {
598    /// Updated OpenAPI spec (replaces existing config)
599    pub config_json: Option<serde_json::Value>,
600    /// Updated spec URL
601    pub openapi_spec_url: Option<String>,
602}
603
604/// Redeploy an existing hosted mock deployment
605///
606/// Updates the machine image and optionally the spec, then restarts.
607pub async fn redeploy_deployment(
608    State(state): State<AppState>,
609    AuthUser(user_id): AuthUser,
610    headers: HeaderMap,
611    Path(deployment_id): Path<Uuid>,
612    body: Option<Json<RedeployRequest>>,
613) -> ApiResult<Json<serde_json::Value>> {
614    let pool = state.db.pool();
615
616    // Resolve org context
617    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
618        .await
619        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
620
621    // Check permission (reuse deploy permission)
622    let checker = PermissionChecker::new(&state);
623    checker
624        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
625        .await?;
626    // pool kept for DeploymentLog + the spawned redeploy orchestration task below
627
628    // Get existing deployment
629    let deployment = state
630        .store
631        .find_hosted_mock_by_id(deployment_id)
632        .await?
633        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
634
635    // Verify ownership
636    if deployment.org_id != org_ctx.org_id {
637        return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
638    }
639
640    // Only allow redeployment of active or failed deployments
641    let status = deployment.status();
642    if !matches!(status, DeploymentStatus::Active | DeploymentStatus::Failed) {
643        return Err(ApiError::InvalidRequest(format!(
644            "Cannot redeploy a deployment with status '{}'. Must be 'active' or 'failed'.",
645            status
646        )));
647    }
648
649    // Update spec if provided
650    let request = body.map(|b| b.0).unwrap_or_default();
651    if request.config_json.is_some() || request.openapi_spec_url.is_some() {
652        let mut query = String::from("UPDATE hosted_mocks SET updated_at = NOW()");
653        let mut param_count = 0;
654
655        if request.config_json.is_some() {
656            param_count += 1;
657            query.push_str(&format!(", config_json = ${}", param_count));
658        }
659        if request.openapi_spec_url.is_some() {
660            param_count += 1;
661            query.push_str(&format!(", openapi_spec_url = ${}", param_count));
662        }
663        param_count += 1;
664        query.push_str(&format!(" WHERE id = ${}", param_count));
665
666        let mut q = sqlx::query(&query);
667        if let Some(ref config) = request.config_json {
668            q = q.bind(config);
669        }
670        if let Some(ref spec_url) = request.openapi_spec_url {
671            q = q.bind(spec_url);
672        }
673        q = q.bind(deployment_id);
674        q.execute(pool).await.map_err(|e| {
675            ApiError::Internal(anyhow::anyhow!("Failed to update deployment: {}", e))
676        })?;
677    }
678
679    // Update status to deploying
680    state
681        .store
682        .update_hosted_mock_status(deployment_id, DeploymentStatus::Deploying, None)
683        .await?;
684
685    DeploymentLog::create(pool, deployment_id, "info", "Redeployment initiated", None)
686        .await
687        .ok();
688
689    // Trigger redeployment in background
690    let pool_clone = pool.clone();
691    let deployment_id_clone = deployment_id;
692    tokio::spawn(async move {
693        let pool = &pool_clone;
694
695        // Fetch the updated deployment
696        let updated_deployment = match HostedMock::find_by_id(pool, deployment_id_clone).await {
697            Ok(Some(d)) => d,
698            Ok(None) => {
699                tracing::error!("Deployment {} not found during redeploy", deployment_id_clone);
700                return;
701            }
702            Err(e) => {
703                tracing::error!(
704                    "Failed to fetch deployment {} for redeploy: {}",
705                    deployment_id_clone,
706                    e
707                );
708                return;
709            }
710        };
711
712        // Try to redeploy via Fly.io if configured
713        if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
714            let flyio_client = FlyioClient::new(flyio_token);
715
716            let machine_id = updated_deployment
717                .metadata_json
718                .get("flyio_machine_id")
719                .and_then(|v| v.as_str());
720
721            if let Some(machine_id) = machine_id {
722                let app_name = format!(
723                    "mockforge-{}-{}",
724                    updated_deployment
725                        .org_id
726                        .to_string()
727                        .replace('-', "")
728                        .chars()
729                        .take(8)
730                        .collect::<String>(),
731                    updated_deployment.slug
732                );
733
734                // Build updated machine config
735                let mut env = std::collections::HashMap::new();
736                env.insert(
737                    "MOCKFORGE_DEPLOYMENT_ID".to_string(),
738                    updated_deployment.id.to_string(),
739                );
740                env.insert("MOCKFORGE_ORG_ID".to_string(), updated_deployment.org_id.to_string());
741                if let Ok(config_str) = serde_json::to_string(&updated_deployment.config_json) {
742                    env.insert("MOCKFORGE_CONFIG".to_string(), config_str);
743                }
744                env.insert("PORT".to_string(), "3000".to_string());
745
746                if let Some(ref spec_url) = updated_deployment.openapi_spec_url {
747                    env.insert("MOCKFORGE_OPENAPI_SPEC_URL".to_string(), spec_url.clone());
748                }
749
750                let image = std::env::var("MOCKFORGE_DOCKER_IMAGE")
751                    .unwrap_or_else(|_| "ghcr.io/saasy-solutions/mockforge:latest".to_string());
752
753                use crate::deployment::flyio::{
754                    FlyioCheck, FlyioMachineConfig, FlyioPort, FlyioRegistryAuth, FlyioService,
755                };
756
757                let services = vec![FlyioService {
758                    protocol: "tcp".to_string(),
759                    internal_port: 3000,
760                    ports: vec![
761                        FlyioPort {
762                            port: 80,
763                            handlers: vec!["http".to_string()],
764                        },
765                        FlyioPort {
766                            port: 443,
767                            handlers: vec!["tls".to_string(), "http".to_string()],
768                        },
769                    ],
770                }];
771
772                let mut checks = std::collections::HashMap::new();
773                checks.insert(
774                    "alive".to_string(),
775                    FlyioCheck {
776                        check_type: "http".to_string(),
777                        port: 3000,
778                        grace_period: "10s".to_string(),
779                        interval: "15s".to_string(),
780                        method: "GET".to_string(),
781                        timeout: "2s".to_string(),
782                        tls_skip_verify: false,
783                        path: Some("/health/live".to_string()),
784                    },
785                );
786
787                let machine_config = FlyioMachineConfig {
788                    image,
789                    env,
790                    services,
791                    checks: Some(checks),
792                    // None = accept Fly's API default
793                    // (shared-cpu-1x:256MB) — same as other call
794                    // sites. Plugin-enabled redeploys will set this
795                    // via FlyioGuest::for_hosted_mock in Phase 2.
796                    guest: None,
797                };
798
799                // Build registry auth
800                let registry_auth = if let (Ok(server), Ok(username), Ok(password)) = (
801                    std::env::var("DOCKER_REGISTRY_SERVER"),
802                    std::env::var("DOCKER_REGISTRY_USERNAME"),
803                    std::env::var("DOCKER_REGISTRY_PASSWORD"),
804                ) {
805                    Some(FlyioRegistryAuth {
806                        server,
807                        username,
808                        password,
809                    })
810                } else if machine_config.image.starts_with("registry.fly.io/") {
811                    Some(FlyioRegistryAuth {
812                        server: "registry.fly.io".to_string(),
813                        username: "x".to_string(),
814                        password: flyio_client.api_token().to_string(),
815                    })
816                } else {
817                    None
818                };
819
820                match flyio_client
821                    .update_machine(&app_name, machine_id, machine_config, registry_auth)
822                    .await
823                {
824                    Ok(_) => {
825                        let _ = DeploymentLog::create(
826                            pool,
827                            deployment_id_clone,
828                            "info",
829                            "Machine updated and restarting",
830                            None,
831                        )
832                        .await;
833                    }
834                    Err(e) => {
835                        tracing::error!("Redeployment failed for {}: {:#}", deployment_id_clone, e);
836                        let _ = HostedMock::update_status(
837                            pool,
838                            deployment_id_clone,
839                            DeploymentStatus::Failed,
840                            Some(&format!("Redeployment failed: {}", e)),
841                        )
842                        .await;
843                        let _ = DeploymentLog::create(
844                            pool,
845                            deployment_id_clone,
846                            "error",
847                            &format!("Redeployment failed: {}", e),
848                            None,
849                        )
850                        .await;
851                        return;
852                    }
853                }
854            } else {
855                tracing::error!(
856                    "No Fly.io machine ID found for deployment {}",
857                    deployment_id_clone
858                );
859                let _ = HostedMock::update_status(
860                    pool,
861                    deployment_id_clone,
862                    DeploymentStatus::Failed,
863                    Some("No Fly.io machine ID found in deployment metadata"),
864                )
865                .await;
866                return;
867            }
868        }
869
870        // Mark as active
871        let _ =
872            HostedMock::update_status(pool, deployment_id_clone, DeploymentStatus::Active, None)
873                .await;
874
875        let _ = DeploymentLog::create(
876            pool,
877            deployment_id_clone,
878            "info",
879            "Redeployment completed successfully",
880            None,
881        )
882        .await;
883
884        tracing::info!("Successfully redeployed mock service: {}", deployment_id_clone);
885    });
886
887    Ok(Json(serde_json::json!({
888        "id": deployment_id,
889        "status": "deploying",
890        "message": "Redeployment initiated"
891    })))
892}
893
894fn deployment_app_name(deployment: &HostedMock) -> String {
895    format!(
896        "mockforge-{}-{}",
897        deployment
898            .org_id
899            .to_string()
900            .replace('-', "")
901            .chars()
902            .take(8)
903            .collect::<String>(),
904        deployment.slug
905    )
906}
907
908/// Stop a running hosted mock deployment.
909///
910/// Gracefully stops the Fly.io machine (without deleting it) and marks
911/// the deployment as `stopped`. Only active deployments can be stopped.
912pub async fn stop_deployment(
913    State(state): State<AppState>,
914    AuthUser(user_id): AuthUser,
915    headers: HeaderMap,
916    Path(deployment_id): Path<Uuid>,
917) -> ApiResult<Json<DeploymentResponse>> {
918    let pool = state.db.pool();
919
920    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
921        .await
922        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
923
924    let checker = PermissionChecker::new(&state);
925    checker
926        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
927        .await?;
928
929    let deployment = state
930        .store
931        .find_hosted_mock_by_id(deployment_id)
932        .await?
933        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
934
935    if deployment.org_id != org_ctx.org_id {
936        return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
937    }
938
939    let status = deployment.status();
940    if !matches!(status, DeploymentStatus::Active) {
941        return Err(ApiError::InvalidRequest(format!(
942            "Cannot stop a deployment with status '{}'. Must be 'active'.",
943            status
944        )));
945    }
946
947    if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
948        let flyio_client = FlyioClient::new(flyio_token);
949        let app_name = deployment_app_name(&deployment);
950        let machine_id = deployment.metadata_json.get("flyio_machine_id").and_then(|v| v.as_str());
951
952        if let Some(machine_id) = machine_id {
953            flyio_client.stop_machine(&app_name, machine_id).await.map_err(|e| {
954                ApiError::Internal(anyhow::anyhow!("Failed to stop machine: {}", e))
955            })?;
956        } else {
957            warn!(
958                "No Fly.io machine ID found for deployment {}; marking as stopped anyway",
959                deployment_id
960            );
961        }
962    }
963
964    state
965        .store
966        .update_hosted_mock_status(deployment_id, DeploymentStatus::Stopped, None)
967        .await?;
968
969    DeploymentLog::create(pool, deployment_id, "info", "Deployment stopped", None)
970        .await
971        .ok();
972
973    let updated = state.store.find_hosted_mock_by_id(deployment_id).await?.ok_or_else(|| {
974        ApiError::Internal(anyhow::anyhow!("Failed to retrieve updated deployment"))
975    })?;
976
977    Ok(Json(DeploymentResponse::from(updated)))
978}
979
980/// Start a stopped hosted mock deployment.
981pub async fn start_deployment(
982    State(state): State<AppState>,
983    AuthUser(user_id): AuthUser,
984    headers: HeaderMap,
985    Path(deployment_id): Path<Uuid>,
986) -> ApiResult<Json<DeploymentResponse>> {
987    let pool = state.db.pool();
988
989    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
990        .await
991        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
992
993    let checker = PermissionChecker::new(&state);
994    checker
995        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
996        .await?;
997
998    let deployment = state
999        .store
1000        .find_hosted_mock_by_id(deployment_id)
1001        .await?
1002        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1003
1004    if deployment.org_id != org_ctx.org_id {
1005        return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
1006    }
1007
1008    let status = deployment.status();
1009    if !matches!(status, DeploymentStatus::Stopped) {
1010        return Err(ApiError::InvalidRequest(format!(
1011            "Cannot start a deployment with status '{}'. Must be 'stopped'.",
1012            status
1013        )));
1014    }
1015
1016    if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
1017        let flyio_client = FlyioClient::new(flyio_token);
1018        let app_name = deployment_app_name(&deployment);
1019        let machine_id = deployment.metadata_json.get("flyio_machine_id").and_then(|v| v.as_str());
1020
1021        if let Some(machine_id) = machine_id {
1022            flyio_client.start_machine(&app_name, machine_id).await.map_err(|e| {
1023                ApiError::Internal(anyhow::anyhow!("Failed to start machine: {}", e))
1024            })?;
1025        } else {
1026            return Err(ApiError::InvalidRequest(
1027                "No Fly.io machine ID found in deployment metadata; cannot start".to_string(),
1028            ));
1029        }
1030    }
1031
1032    state
1033        .store
1034        .update_hosted_mock_status(deployment_id, DeploymentStatus::Active, None)
1035        .await?;
1036
1037    DeploymentLog::create(pool, deployment_id, "info", "Deployment started", None)
1038        .await
1039        .ok();
1040
1041    let updated = state.store.find_hosted_mock_by_id(deployment_id).await?.ok_or_else(|| {
1042        ApiError::Internal(anyhow::anyhow!("Failed to retrieve updated deployment"))
1043    })?;
1044
1045    Ok(Json(DeploymentResponse::from(updated)))
1046}
1047
1048/// Get deployment logs
1049pub async fn get_deployment_logs(
1050    State(state): State<AppState>,
1051    AuthUser(user_id): AuthUser,
1052    headers: HeaderMap,
1053    Path(deployment_id): Path<Uuid>,
1054) -> ApiResult<Json<Vec<LogResponse>>> {
1055    let pool = state.db.pool();
1056
1057    // Resolve org context
1058    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1059        .await
1060        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1061
1062    // Get deployment
1063    let deployment = HostedMock::find_by_id(pool, deployment_id)
1064        .await
1065        .map_err(ApiError::Database)?
1066        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1067
1068    // Verify access
1069    if deployment.org_id != org_ctx.org_id {
1070        return Err(ApiError::InvalidRequest(
1071            "You don't have access to this deployment".to_string(),
1072        ));
1073    }
1074
1075    // Get logs
1076    let logs = DeploymentLog::find_by_mock(pool, deployment_id, Some(100))
1077        .await
1078        .map_err(ApiError::Database)?;
1079
1080    let responses: Vec<LogResponse> = logs.into_iter().map(LogResponse::from).collect();
1081
1082    Ok(Json(responses))
1083}
1084
1085/// Query parameters for the runtime-logs endpoint.
1086#[derive(Debug, Deserialize)]
1087pub struct RuntimeLogsQuery {
1088    /// Maximum entries to return. Falls back to the env-configured default.
1089    pub limit: Option<u32>,
1090    /// RFC3339 timestamp; only entries strictly newer than this are returned.
1091    pub since: Option<String>,
1092}
1093
1094/// Get runtime logs for a deployment by polling the Fly logs API.
1095///
1096/// This is the new "logs" surface (#224). The historical
1097/// `GET /api/v1/hosted-mocks/{id}/logs` endpoint stays as deployment events
1098/// (lifecycle entries from the local `deployment_logs` table) so the UI can
1099/// surface both views: an "Events" tab from the existing endpoint and a
1100/// "Logs" tab backed by this one.
1101pub async fn get_runtime_logs(
1102    State(state): State<AppState>,
1103    AuthUser(user_id): AuthUser,
1104    headers: HeaderMap,
1105    Path(deployment_id): Path<Uuid>,
1106    Query(params): Query<RuntimeLogsQuery>,
1107) -> ApiResult<Json<Vec<LogEntry>>> {
1108    let pool = state.db.pool();
1109
1110    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1111        .await
1112        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1113
1114    let deployment = HostedMock::find_by_id(pool, deployment_id)
1115        .await
1116        .map_err(ApiError::Database)?
1117        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1118
1119    if deployment.org_id != org_ctx.org_id {
1120        return Err(ApiError::InvalidRequest(
1121            "You don't have access to this deployment".to_string(),
1122        ));
1123    }
1124
1125    let Some(client) = crate::fly_logs::global() else {
1126        // No Fly token configured — return empty rather than 500. Operators
1127        // see this in self-hosted / dev; the UI shows a "not configured"
1128        // hint when the list is empty and the configured flag (sent in a
1129        // header below) is false.
1130        return Ok(Json(Vec::new()));
1131    };
1132
1133    let since = params
1134        .since
1135        .as_deref()
1136        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
1137        .map(|d| d.with_timezone(&Utc));
1138
1139    let app_name = deployment.fly_app_name();
1140    let entries = client.fetch_recent(&app_name, since, params.limit).await.map_err(|e| {
1141        warn!(error = %e, app_name = %app_name, "Fly runtime logs fetch failed");
1142        ApiError::Internal(anyhow::anyhow!("Fly logs query failed: {}", e))
1143    })?;
1144
1145    Ok(Json(entries))
1146}
1147
1148/// SSE stream of runtime logs. Polls the Fly logs API every few seconds and
1149/// emits new entries as `data:` events. Closes on the first transient error
1150/// after surfacing it as a `data:` event so the browser can render a banner.
1151pub async fn stream_runtime_logs(
1152    State(state): State<AppState>,
1153    AuthUser(user_id): AuthUser,
1154    headers: HeaderMap,
1155    Path(deployment_id): Path<Uuid>,
1156) -> ApiResult<Sse<impl Stream<Item = Result<Event, Infallible>>>> {
1157    let pool = state.db.pool();
1158    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1159        .await
1160        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1161
1162    let deployment = HostedMock::find_by_id(pool, deployment_id)
1163        .await
1164        .map_err(ApiError::Database)?
1165        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1166
1167    if deployment.org_id != org_ctx.org_id {
1168        return Err(ApiError::InvalidRequest(
1169            "You don't have access to this deployment".to_string(),
1170        ));
1171    }
1172
1173    let app_name = deployment.fly_app_name();
1174    let stream = build_runtime_logs_stream(app_name);
1175
1176    Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
1177}
1178
1179/// Build the SSE event stream for runtime logs. Each tick polls Fly with a
1180/// `since` cursor that advances past the latest entry observed so we don't
1181/// duplicate events on refetch.
1182fn build_runtime_logs_stream(app_name: String) -> impl Stream<Item = Result<Event, Infallible>> {
1183    use futures_util::stream::unfold;
1184
1185    /// Per-stream poll state: cursor + tick count for an initial empty-poll
1186    /// quietness. We start the cursor a few seconds in the past so the first
1187    /// poll picks up "what's happening now" without dumping the whole window.
1188    struct State {
1189        cursor: DateTime<Utc>,
1190        client: Option<&'static crate::fly_logs::FlyLogsClient>,
1191        app_name: String,
1192        emitted_unconfigured: bool,
1193    }
1194
1195    let state = State {
1196        cursor: Utc::now() - chrono::Duration::seconds(30),
1197        client: crate::fly_logs::global(),
1198        app_name,
1199        emitted_unconfigured: false,
1200    };
1201
1202    unfold(state, |mut st| async move {
1203        // Slow poll loop. 2 seconds keeps the UI responsive without hammering
1204        // Fly's API; tighten once we move to the NATS subscription path
1205        // (#232).
1206        if st.client.is_none() && !st.emitted_unconfigured {
1207            st.emitted_unconfigured = true;
1208            let event = Event::default()
1209                .event("config")
1210                .data("Fly runtime logs are not configured (FLYIO_API_TOKEN unset)");
1211            return Some((Ok(event), st));
1212        }
1213
1214        tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1215
1216        let Some(client) = st.client else {
1217            // Configured-flag already emitted; keep stream alive with a
1218            // periodic comment so the connection doesn't close.
1219            let event = Event::default().comment("idle");
1220            return Some((Ok(event), st));
1221        };
1222
1223        match client.fetch_recent(&st.app_name, Some(st.cursor), None).await {
1224            Ok(entries) if entries.is_empty() => {
1225                let event = Event::default().comment("no-new-logs");
1226                Some((Ok(event), st))
1227            }
1228            Ok(entries) => {
1229                // Advance cursor to the newest entry we just emitted.
1230                if let Some(latest) = entries.iter().map(|e| e.timestamp).max() {
1231                    st.cursor = latest;
1232                }
1233                let payload = serde_json::to_string(&entries).unwrap_or_else(|_| "[]".to_string());
1234                let event = Event::default().event("logs").data(payload);
1235                Some((Ok(event), st))
1236            }
1237            Err(err) => {
1238                let payload = serde_json::json!({ "error": err.to_string() }).to_string();
1239                let event = Event::default().event("error").data(payload);
1240                Some((Ok(event), st))
1241            }
1242        }
1243    })
1244}
1245
1246/// One captured request/response event. Mirrors `RequestLogEvent` in
1247/// `mockforge-observability::log_shipper` — both sides serialize via serde
1248/// so the wire format is the canonical contract.
1249#[derive(Debug, Deserialize, Serialize)]
1250pub struct RuntimeRequestEvent {
1251    pub timestamp: DateTime<Utc>,
1252    pub method: String,
1253    pub path: String,
1254    pub status: u16,
1255    pub latency_ms: u32,
1256    #[serde(default)]
1257    pub matched_route: Option<String>,
1258    #[serde(default)]
1259    pub client_ip: Option<String>,
1260    #[serde(default)]
1261    pub user_agent: Option<String>,
1262    #[serde(default)]
1263    pub request_id: Option<String>,
1264    #[serde(default)]
1265    pub bytes_in: Option<i64>,
1266    #[serde(default)]
1267    pub bytes_out: Option<i64>,
1268}
1269
1270#[derive(Debug, Deserialize)]
1271pub struct IngestPayload {
1272    pub events: Vec<RuntimeRequestEvent>,
1273}
1274
1275#[derive(Debug, Serialize)]
1276pub struct IngestResponse {
1277    pub accepted: usize,
1278}
1279
1280/// Ingest a batch of structured request logs from a hosted-mock container.
1281///
1282/// This endpoint is **not** behind the user-scoped `auth_middleware` — it
1283/// authenticates with a deployment-scoped JWT minted by the orchestrator
1284/// at deploy time and passed to the container as `MOCKFORGE_LOG_INGEST_TOKEN`.
1285/// The token's subject embeds the deployment_id; we reject mismatches.
1286pub async fn ingest_runtime_logs(
1287    State(state): State<AppState>,
1288    Path(deployment_id): Path<Uuid>,
1289    headers: HeaderMap,
1290    Json(payload): Json<IngestPayload>,
1291) -> ApiResult<Json<IngestResponse>> {
1292    // Bearer token from the in-container shipper.
1293    let auth = headers
1294        .get("Authorization")
1295        .and_then(|h| h.to_str().ok())
1296        .and_then(|h| h.strip_prefix("Bearer "))
1297        .ok_or_else(|| ApiError::InvalidRequest("Missing deployment ingest token".to_string()))?;
1298
1299    let token_deployment_id = mockforge_registry_core::auth::verify_deployment_ingest_token(
1300        auth,
1301        &state.config.jwt_secret,
1302    )
1303    .map_err(|e| {
1304        tracing::warn!(error = %e, "Deployment ingest token rejected");
1305        ApiError::InvalidRequest("Invalid deployment ingest token".to_string())
1306    })?;
1307
1308    if token_deployment_id != deployment_id {
1309        return Err(ApiError::InvalidRequest(
1310            "Token deployment id does not match URL path".to_string(),
1311        ));
1312    }
1313
1314    if payload.events.is_empty() {
1315        return Ok(Json(IngestResponse { accepted: 0 }));
1316    }
1317
1318    // Cap accepted batch size as a defence against runaway shippers — a
1319    // misbehaving container shouldn't be able to flood the table with one
1320    // request. Matches the shipper's default flush size.
1321    const MAX_BATCH: usize = 500;
1322    let events: Vec<RuntimeRequestEvent> = payload.events.into_iter().take(MAX_BATCH).collect();
1323    let accepted = events.len();
1324
1325    // Bulk insert. Building one INSERT with many VALUES rows would be
1326    // marginally faster but uglier with sqlx; the row count per batch is
1327    // small (50 by default), so per-row inserts in a transaction is fine.
1328    let pool = state.db.pool();
1329    let mut tx = pool.begin().await.map_err(ApiError::Database)?;
1330    for evt in &events {
1331        sqlx::query(
1332            r#"
1333            INSERT INTO runtime_request_logs (
1334                deployment_id, occurred_at, method, path, status, latency_ms,
1335                matched_route, client_ip, user_agent, request_id, bytes_in, bytes_out
1336            )
1337            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
1338            "#,
1339        )
1340        .bind(deployment_id)
1341        .bind(evt.timestamp)
1342        .bind(&evt.method)
1343        .bind(&evt.path)
1344        .bind(evt.status as i16)
1345        .bind(evt.latency_ms as i32)
1346        .bind(evt.matched_route.as_ref())
1347        .bind(evt.client_ip.as_ref())
1348        .bind(evt.user_agent.as_ref())
1349        .bind(evt.request_id.as_ref())
1350        .bind(evt.bytes_in)
1351        .bind(evt.bytes_out)
1352        .execute(&mut *tx)
1353        .await
1354        .map_err(ApiError::Database)?;
1355    }
1356    tx.commit().await.map_err(ApiError::Database)?;
1357
1358    Ok(Json(IngestResponse { accepted }))
1359}
1360
1361// === Capture ingest (#234 part 2) ===
1362//
1363// The recorder ships completed exchanges (request + response) here. The
1364// wire format mirrors `mockforge_recorder::models::RecordedExchange` —
1365// we duplicate just the fields rather than depend on the recorder crate
1366// directly, since the registry server doesn't need any of the recorder's
1367// behaviour. Both sides round-trip through serde, so renames on the
1368// recorder side stay compatible as long as the JSON field names match.
1369
1370#[derive(Debug, Deserialize)]
1371pub struct CaptureIngestRequest {
1372    pub id: String,
1373    pub protocol: String,
1374    pub timestamp: DateTime<Utc>,
1375    pub method: String,
1376    pub path: String,
1377    #[serde(default)]
1378    pub query_params: Option<String>,
1379    pub headers: String,
1380    #[serde(default)]
1381    pub body: Option<String>,
1382    pub body_encoding: String,
1383    #[serde(default)]
1384    pub client_ip: Option<String>,
1385    #[serde(default)]
1386    pub trace_id: Option<String>,
1387    #[serde(default)]
1388    pub span_id: Option<String>,
1389    #[serde(default)]
1390    pub duration_ms: Option<i64>,
1391    #[serde(default)]
1392    pub status_code: Option<i32>,
1393    #[serde(default)]
1394    pub tags: Option<String>,
1395}
1396
1397#[derive(Debug, Deserialize)]
1398pub struct CaptureIngestResponse {
1399    pub status_code: i32,
1400    pub headers: String,
1401    #[serde(default)]
1402    pub body: Option<String>,
1403    pub body_encoding: String,
1404    pub size_bytes: i64,
1405    pub timestamp: DateTime<Utc>,
1406}
1407
1408#[derive(Debug, Deserialize)]
1409pub struct CaptureIngestExchange {
1410    pub request: CaptureIngestRequest,
1411    #[serde(default)]
1412    pub response: Option<CaptureIngestResponse>,
1413}
1414
1415#[derive(Debug, Deserialize)]
1416pub struct CaptureIngestPayload {
1417    pub exchanges: Vec<CaptureIngestExchange>,
1418}
1419
1420/// Persist a batch of recorder captures shipped by an in-container
1421/// `CaptureCloudSyncHandle`. Same deployment-scoped JWT contract as the
1422/// log and OTLP ingest endpoints. Inserts use ON CONFLICT DO NOTHING so
1423/// that re-shipped exchanges (retries, container restarts with cached
1424/// buffer) don't produce duplicates.
1425pub async fn ingest_runtime_captures(
1426    State(state): State<AppState>,
1427    Path(deployment_id): Path<Uuid>,
1428    headers: HeaderMap,
1429    Json(payload): Json<CaptureIngestPayload>,
1430) -> ApiResult<Json<IngestResponse>> {
1431    let auth = headers
1432        .get("Authorization")
1433        .and_then(|h| h.to_str().ok())
1434        .and_then(|h| h.strip_prefix("Bearer "))
1435        .ok_or_else(|| ApiError::InvalidRequest("Missing deployment ingest token".to_string()))?;
1436
1437    let token_deployment_id = mockforge_registry_core::auth::verify_deployment_ingest_token(
1438        auth,
1439        &state.config.jwt_secret,
1440    )
1441    .map_err(|e| {
1442        tracing::warn!(error = %e, "Capture ingest token rejected");
1443        ApiError::InvalidRequest("Invalid deployment ingest token".to_string())
1444    })?;
1445
1446    if token_deployment_id != deployment_id {
1447        return Err(ApiError::InvalidRequest(
1448            "Token deployment id does not match URL path".to_string(),
1449        ));
1450    }
1451
1452    if payload.exchanges.is_empty() {
1453        return Ok(Json(IngestResponse { accepted: 0 }));
1454    }
1455
1456    // Capture rows are larger than log rows (full request + response
1457    // bodies), so the cap is tighter — a runaway shipper shouldn't fill
1458    // a single transaction with megabytes of payload.
1459    const MAX_BATCH: usize = 100;
1460    let exchanges: Vec<CaptureIngestExchange> =
1461        payload.exchanges.into_iter().take(MAX_BATCH).collect();
1462    let accepted = exchanges.len();
1463
1464    let pool = state.db.pool();
1465    let mut tx = pool.begin().await.map_err(ApiError::Database)?;
1466    for exchange in &exchanges {
1467        let req = &exchange.request;
1468        let resp = exchange.response.as_ref();
1469        sqlx::query(
1470            r#"
1471            INSERT INTO runtime_captures (
1472                deployment_id, capture_id, protocol, occurred_at, method, path,
1473                query_params, request_headers, request_body, request_body_encoding,
1474                client_ip, trace_id, span_id, duration_ms, status_code, tags,
1475                response_status_code, response_headers, response_body,
1476                response_body_encoding, response_size_bytes, response_timestamp
1477            )
1478            VALUES (
1479                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14,
1480                $15, $16, $17, $18, $19, $20, $21, $22
1481            )
1482            ON CONFLICT (deployment_id, capture_id) DO NOTHING
1483            "#,
1484        )
1485        .bind(deployment_id)
1486        .bind(&req.id)
1487        .bind(&req.protocol)
1488        .bind(req.timestamp)
1489        .bind(&req.method)
1490        .bind(&req.path)
1491        .bind(req.query_params.as_ref())
1492        .bind(&req.headers)
1493        .bind(req.body.as_ref())
1494        .bind(&req.body_encoding)
1495        .bind(req.client_ip.as_ref())
1496        .bind(req.trace_id.as_ref())
1497        .bind(req.span_id.as_ref())
1498        .bind(req.duration_ms)
1499        .bind(req.status_code)
1500        .bind(req.tags.as_ref())
1501        .bind(resp.map(|r| r.status_code))
1502        .bind(resp.map(|r| r.headers.as_str()))
1503        .bind(resp.and_then(|r| r.body.as_deref()))
1504        .bind(resp.map(|r| r.body_encoding.as_str()))
1505        .bind(resp.map(|r| r.size_bytes))
1506        .bind(resp.map(|r| r.timestamp))
1507        .execute(&mut *tx)
1508        .await
1509        .map_err(ApiError::Database)?;
1510    }
1511    tx.commit().await.map_err(ApiError::Database)?;
1512
1513    Ok(Json(IngestResponse { accepted }))
1514}
1515
1516#[derive(Debug, Deserialize)]
1517pub struct RuntimeRequestsQuery {
1518    /// Maximum entries to return. Capped at 500 server-side.
1519    pub limit: Option<u32>,
1520    /// RFC3339 timestamp; only entries strictly newer than this are returned.
1521    pub since: Option<String>,
1522}
1523
1524/// Read back recent runtime request logs for a deployment. Powers the
1525/// admin UI's "Requests" tab.
1526pub async fn get_runtime_requests(
1527    State(state): State<AppState>,
1528    AuthUser(user_id): AuthUser,
1529    headers: HeaderMap,
1530    Path(deployment_id): Path<Uuid>,
1531    Query(params): Query<RuntimeRequestsQuery>,
1532) -> ApiResult<Json<Vec<RuntimeRequestEvent>>> {
1533    let pool = state.db.pool();
1534
1535    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1536        .await
1537        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1538
1539    let deployment = HostedMock::find_by_id(pool, deployment_id)
1540        .await
1541        .map_err(ApiError::Database)?
1542        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1543
1544    if deployment.org_id != org_ctx.org_id {
1545        return Err(ApiError::InvalidRequest(
1546            "You don't have access to this deployment".to_string(),
1547        ));
1548    }
1549
1550    let limit = params.limit.unwrap_or(100).min(500) as i64;
1551    let since = params
1552        .since
1553        .as_deref()
1554        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
1555        .map(|d| d.with_timezone(&Utc));
1556
1557    type RuntimeRequestRow = (
1558        DateTime<Utc>,
1559        String,
1560        String,
1561        i16,
1562        i32,
1563        Option<String>,
1564        Option<String>,
1565        Option<String>,
1566        Option<String>,
1567        Option<i64>,
1568        Option<i64>,
1569    );
1570
1571    let rows: Vec<RuntimeRequestRow> = if let Some(since) = since {
1572        sqlx::query_as(
1573            r#"
1574            SELECT occurred_at, method, path, status, latency_ms,
1575                   matched_route, client_ip, user_agent, request_id,
1576                   bytes_in, bytes_out
1577            FROM runtime_request_logs
1578            WHERE deployment_id = $1 AND occurred_at > $2
1579            ORDER BY occurred_at DESC
1580            LIMIT $3
1581            "#,
1582        )
1583        .bind(deployment_id)
1584        .bind(since)
1585        .bind(limit)
1586        .fetch_all(pool)
1587        .await
1588        .map_err(ApiError::Database)?
1589    } else {
1590        sqlx::query_as(
1591            r#"
1592            SELECT occurred_at, method, path, status, latency_ms,
1593                   matched_route, client_ip, user_agent, request_id,
1594                   bytes_in, bytes_out
1595            FROM runtime_request_logs
1596            WHERE deployment_id = $1
1597            ORDER BY occurred_at DESC
1598            LIMIT $2
1599            "#,
1600        )
1601        .bind(deployment_id)
1602        .bind(limit)
1603        .fetch_all(pool)
1604        .await
1605        .map_err(ApiError::Database)?
1606    };
1607
1608    let events: Vec<RuntimeRequestEvent> = rows
1609        .into_iter()
1610        .map(|row| RuntimeRequestEvent {
1611            timestamp: row.0,
1612            method: row.1,
1613            path: row.2,
1614            status: row.3 as u16,
1615            latency_ms: row.4 as u32,
1616            matched_route: row.5,
1617            client_ip: row.6,
1618            user_agent: row.7,
1619            request_id: row.8,
1620            bytes_in: row.9,
1621            bytes_out: row.10,
1622        })
1623        .collect();
1624
1625    Ok(Json(events))
1626}
1627
1628/// Proxy a request to the deployment's local recorder API.
1629///
1630/// The mockforge-recorder library is mounted on the deployed `http_app` at
1631/// `/api/recorder/*` (#234), but the deployed instance has no per-user auth
1632/// on those routes. This handler is the cloud-side gate: it verifies the
1633/// caller has access to the deployment, then forwards the request to the
1634/// deployment's internal URL.
1635///
1636/// Captures stay ephemeral on the deployment's local SQLite (Fly machines
1637/// don't have a persistent volume mounted by default). For long-term
1638/// retention we'd want either a Fly volume mount or a forwarder pattern
1639/// like the log shipper — both bigger and tracked separately.
1640async fn proxy_to_deployment_recorder(
1641    deployment: &HostedMock,
1642    path_and_query: &str,
1643) -> ApiResult<axum::http::Response<axum::body::Body>> {
1644    let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
1645    let Some(base) = base else {
1646        return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
1647    };
1648    let url = format!("{}{}", base.trim_end_matches('/'), path_and_query);
1649
1650    let client = reqwest::Client::builder()
1651        .timeout(std::time::Duration::from_secs(10))
1652        .build()
1653        .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
1654
1655    let resp =
1656        client.get(&url).send().await.map_err(|e| {
1657            ApiError::Internal(anyhow::anyhow!("Recorder proxy fetch failed: {}", e))
1658        })?;
1659
1660    let status = resp.status();
1661    let headers = resp.headers().clone();
1662    let body = resp.bytes().await.map_err(|e| {
1663        ApiError::Internal(anyhow::anyhow!("Recorder proxy read body failed: {}", e))
1664    })?;
1665
1666    let mut builder = axum::http::Response::builder().status(status);
1667    if let Some(content_type) = headers.get(axum::http::header::CONTENT_TYPE) {
1668        builder = builder.header(axum::http::header::CONTENT_TYPE, content_type);
1669    }
1670    builder.body(axum::body::Body::from(body)).map_err(|e| {
1671        ApiError::Internal(anyhow::anyhow!("Recorder proxy response build failed: {}", e))
1672    })
1673}
1674
1675/// Proxy a list/get to the deployment's state-machine API. Mirrors
1676/// `proxy_to_deployment_recorder` exactly — the only difference is the
1677/// upstream path is `/__mockforge/api/state-machines/*`. Kept as a thin
1678/// wrapper so the call sites read the same way as the recorder ones.
1679async fn proxy_to_deployment_state_machines(
1680    deployment: &HostedMock,
1681    path_and_query: &str,
1682) -> ApiResult<axum::http::Response<axum::body::Body>> {
1683    let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
1684    let Some(base) = base else {
1685        return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
1686    };
1687    let url = format!("{}{}", base.trim_end_matches('/'), path_and_query);
1688
1689    let client = reqwest::Client::builder()
1690        .timeout(std::time::Duration::from_secs(10))
1691        .build()
1692        .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
1693
1694    let resp = client.get(&url).send().await.map_err(|e| {
1695        ApiError::Internal(anyhow::anyhow!("State-machine proxy fetch failed: {}", e))
1696    })?;
1697    let status = resp.status();
1698    let headers = resp.headers().clone();
1699    let body = resp.bytes().await.map_err(|e| {
1700        ApiError::Internal(anyhow::anyhow!("State-machine proxy read body failed: {}", e))
1701    })?;
1702
1703    let mut builder = axum::http::Response::builder().status(status);
1704    if let Some(ct) = headers.get(axum::http::header::CONTENT_TYPE) {
1705        builder = builder.header(axum::http::header::CONTENT_TYPE, ct);
1706    }
1707    builder.body(axum::body::Body::from(body)).map_err(|e| {
1708        ApiError::Internal(anyhow::anyhow!("State-machine proxy response build failed: {}", e))
1709    })
1710}
1711
1712/// List state machines configured on a hosted-mock deployment. Proxies
1713/// `GET /__mockforge/api/state-machines` on the deployed instance.
1714pub async fn list_deployment_state_machines(
1715    State(state): State<AppState>,
1716    AuthUser(user_id): AuthUser,
1717    headers: HeaderMap,
1718    Path(deployment_id): Path<Uuid>,
1719) -> ApiResult<axum::http::Response<axum::body::Body>> {
1720    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
1721    proxy_to_deployment_state_machines(&deployment, "/__mockforge/api/state-machines").await
1722}
1723
1724/// Get a single state machine definition. Proxies the deployment's
1725/// `GET /__mockforge/api/state-machines/{resource_type}`.
1726pub async fn get_deployment_state_machine(
1727    State(state): State<AppState>,
1728    AuthUser(user_id): AuthUser,
1729    headers: HeaderMap,
1730    Path((deployment_id, resource_type)): Path<(Uuid, String)>,
1731) -> ApiResult<axum::http::Response<axum::body::Body>> {
1732    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
1733    if resource_type.contains('/') || resource_type.contains('?') || resource_type.contains('#') {
1734        return Err(ApiError::InvalidRequest("Invalid resource type".to_string()));
1735    }
1736    let path = format!("/__mockforge/api/state-machines/{}", urlencoding::encode(&resource_type));
1737    proxy_to_deployment_state_machines(&deployment, &path).await
1738}
1739
1740/// List instances of state machines on the deployment. Proxies
1741/// `GET /__mockforge/api/state-machines/instances`.
1742pub async fn list_deployment_state_machine_instances(
1743    State(state): State<AppState>,
1744    AuthUser(user_id): AuthUser,
1745    headers: HeaderMap,
1746    Path(deployment_id): Path<Uuid>,
1747) -> ApiResult<axum::http::Response<axum::body::Body>> {
1748    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
1749    proxy_to_deployment_state_machines(&deployment, "/__mockforge/api/state-machines/instances")
1750        .await
1751}
1752
1753#[derive(Debug, Deserialize)]
1754pub struct RecorderCapturesQuery {
1755    pub limit: Option<u32>,
1756    pub since: Option<String>,
1757}
1758
1759/// List recently captured request/response pairs for a deployment.
1760///
1761/// Reads from the cloud-side `runtime_captures` mirror (#234 part 2)
1762/// when the deployment has any synced rows; falls back to proxying the
1763/// deployment's `/api/recorder/requests` for older deployments that
1764/// haven't been redeployed onto the cloud-sync image. Once a deployment
1765/// has shipped at least one capture, we never proxy again — the
1766/// Postgres path is faster (no extra hop) and durable across restart.
1767pub async fn list_recorder_captures(
1768    State(state): State<AppState>,
1769    AuthUser(user_id): AuthUser,
1770    headers: HeaderMap,
1771    Path(deployment_id): Path<Uuid>,
1772    Query(params): Query<RecorderCapturesQuery>,
1773) -> ApiResult<axum::http::Response<axum::body::Body>> {
1774    let pool = state.db.pool();
1775
1776    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1777        .await
1778        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1779
1780    let deployment = HostedMock::find_by_id(pool, deployment_id)
1781        .await
1782        .map_err(ApiError::Database)?
1783        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1784
1785    if deployment.org_id != org_ctx.org_id {
1786        return Err(ApiError::InvalidRequest(
1787            "You don't have access to this deployment".to_string(),
1788        ));
1789    }
1790
1791    // If this deployment has synced any captures, always serve from
1792    // Postgres. The EXISTS check is index-bound (single row probe on
1793    // the `(deployment_id, occurred_at DESC)` index) and adds <1ms.
1794    let has_synced: bool = sqlx::query_scalar(
1795        "SELECT EXISTS(SELECT 1 FROM runtime_captures WHERE deployment_id = $1 LIMIT 1)",
1796    )
1797    .bind(deployment_id)
1798    .fetch_one(pool)
1799    .await
1800    .map_err(ApiError::Database)?;
1801
1802    if has_synced {
1803        let limit = params.limit.unwrap_or(100).min(500) as i64;
1804        let since = params
1805            .since
1806            .as_deref()
1807            .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
1808            .map(|d| d.with_timezone(&Utc));
1809        let captures = list_cloud_captures(pool, deployment_id, limit, since).await?;
1810        let body = serde_json::to_vec(&captures).map_err(|e| {
1811            ApiError::InvalidRequest(format!("Failed to serialize captures: {}", e))
1812        })?;
1813        return Ok(axum::http::Response::builder()
1814            .status(axum::http::StatusCode::OK)
1815            .header("content-type", "application/json")
1816            .body(axum::body::Body::from(body))
1817            .unwrap());
1818    }
1819
1820    let mut qs = String::from("/api/recorder/requests");
1821    let mut sep = '?';
1822    if let Some(limit) = params.limit {
1823        qs.push(sep);
1824        qs.push_str(&format!("limit={}", limit));
1825        sep = '&';
1826    }
1827    if let Some(since) = params.since.as_deref() {
1828        qs.push(sep);
1829        qs.push_str(&format!("since={}", urlencoding::encode(since)));
1830    }
1831
1832    let _ = state;
1833    proxy_to_deployment_recorder(&deployment, &qs).await
1834}
1835
1836/// Cloud-Postgres list query mapped into the same shape the recorder
1837/// proxy emits. Kept private — callers go through `list_recorder_captures`.
1838async fn list_cloud_captures(
1839    pool: &sqlx::PgPool,
1840    deployment_id: Uuid,
1841    limit: i64,
1842    since: Option<DateTime<Utc>>,
1843) -> ApiResult<Vec<serde_json::Value>> {
1844    type Row = (
1845        String,         // capture_id
1846        String,         // protocol
1847        DateTime<Utc>,  // occurred_at
1848        String,         // method
1849        String,         // path
1850        Option<String>, // query_params
1851        String,         // request_headers
1852        Option<String>, // request_body
1853        String,         // request_body_encoding
1854        Option<String>, // client_ip
1855        Option<String>, // trace_id
1856        Option<String>, // span_id
1857        Option<i64>,    // duration_ms
1858        Option<i32>,    // status_code
1859        Option<String>, // tags
1860    );
1861
1862    let rows: Vec<Row> = if let Some(since) = since {
1863        sqlx::query_as(
1864            r#"
1865            SELECT capture_id, protocol, occurred_at, method, path, query_params,
1866                   request_headers, request_body, request_body_encoding,
1867                   client_ip, trace_id, span_id, duration_ms, status_code, tags
1868            FROM runtime_captures
1869            WHERE deployment_id = $1 AND occurred_at > $2
1870            ORDER BY occurred_at DESC
1871            LIMIT $3
1872            "#,
1873        )
1874        .bind(deployment_id)
1875        .bind(since)
1876        .bind(limit)
1877        .fetch_all(pool)
1878        .await
1879        .map_err(ApiError::Database)?
1880    } else {
1881        sqlx::query_as(
1882            r#"
1883            SELECT capture_id, protocol, occurred_at, method, path, query_params,
1884                   request_headers, request_body, request_body_encoding,
1885                   client_ip, trace_id, span_id, duration_ms, status_code, tags
1886            FROM runtime_captures
1887            WHERE deployment_id = $1
1888            ORDER BY occurred_at DESC
1889            LIMIT $2
1890            "#,
1891        )
1892        .bind(deployment_id)
1893        .bind(limit)
1894        .fetch_all(pool)
1895        .await
1896        .map_err(ApiError::Database)?
1897    };
1898
1899    let captures = rows
1900        .into_iter()
1901        .map(|r| {
1902            serde_json::json!({
1903                "id": r.0,
1904                "protocol": r.1,
1905                "timestamp": r.2,
1906                "method": r.3,
1907                "path": r.4,
1908                "query_params": r.5,
1909                "headers": r.6,
1910                "body": r.7,
1911                "body_encoding": r.8,
1912                "client_ip": r.9,
1913                "trace_id": r.10,
1914                "span_id": r.11,
1915                "duration_ms": r.12,
1916                "status_code": r.13,
1917                "tags": r.14,
1918            })
1919        })
1920        .collect();
1921    Ok(captures)
1922}
1923
1924/// Get a single capture by id. Cloud-Postgres-first; falls through to
1925/// the deployment proxy when the row hasn't synced.
1926pub async fn get_recorder_capture(
1927    State(state): State<AppState>,
1928    AuthUser(user_id): AuthUser,
1929    headers: HeaderMap,
1930    Path((deployment_id, capture_id)): Path<(Uuid, String)>,
1931) -> ApiResult<axum::http::Response<axum::body::Body>> {
1932    let pool = state.db.pool();
1933
1934    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1935        .await
1936        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1937
1938    let deployment = HostedMock::find_by_id(pool, deployment_id)
1939        .await
1940        .map_err(ApiError::Database)?
1941        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1942
1943    if deployment.org_id != org_ctx.org_id {
1944        return Err(ApiError::InvalidRequest(
1945            "You don't have access to this deployment".to_string(),
1946        ));
1947    }
1948
1949    // Validate the capture id is path-safe — defence in depth even though
1950    // we already URL-encode below.
1951    if capture_id.contains('/') || capture_id.contains('?') || capture_id.contains('#') {
1952        return Err(ApiError::InvalidRequest("Invalid capture id".to_string()));
1953    }
1954
1955    if let Some(row) = fetch_cloud_capture(pool, deployment_id, &capture_id).await? {
1956        let body = serde_json::to_vec(&row)
1957            .map_err(|e| ApiError::InvalidRequest(format!("Failed to serialize capture: {}", e)))?;
1958        return Ok(axum::http::Response::builder()
1959            .status(axum::http::StatusCode::OK)
1960            .header("content-type", "application/json")
1961            .body(axum::body::Body::from(body))
1962            .unwrap());
1963    }
1964
1965    let path = format!("/api/recorder/requests/{}", urlencoding::encode(&capture_id));
1966    let _ = state;
1967    proxy_to_deployment_recorder(&deployment, &path).await
1968}
1969
1970/// Single-row counterpart to `list_cloud_captures`. Returns None when
1971/// the capture isn't in cloud Postgres — caller can then proxy.
1972async fn fetch_cloud_capture(
1973    pool: &sqlx::PgPool,
1974    deployment_id: Uuid,
1975    capture_id: &str,
1976) -> ApiResult<Option<serde_json::Value>> {
1977    type Row = (
1978        String,
1979        String,
1980        DateTime<Utc>,
1981        String,
1982        String,
1983        Option<String>,
1984        String,
1985        Option<String>,
1986        String,
1987        Option<String>,
1988        Option<String>,
1989        Option<String>,
1990        Option<i64>,
1991        Option<i32>,
1992        Option<String>,
1993    );
1994    let row: Option<Row> = sqlx::query_as(
1995        r#"
1996        SELECT capture_id, protocol, occurred_at, method, path, query_params,
1997               request_headers, request_body, request_body_encoding,
1998               client_ip, trace_id, span_id, duration_ms, status_code, tags
1999        FROM runtime_captures
2000        WHERE deployment_id = $1 AND capture_id = $2
2001        LIMIT 1
2002        "#,
2003    )
2004    .bind(deployment_id)
2005    .bind(capture_id)
2006    .fetch_optional(pool)
2007    .await
2008    .map_err(ApiError::Database)?;
2009    Ok(row.map(|r| {
2010        serde_json::json!({
2011            "id": r.0,
2012            "protocol": r.1,
2013            "timestamp": r.2,
2014            "method": r.3,
2015            "path": r.4,
2016            "query_params": r.5,
2017            "headers": r.6,
2018            "body": r.7,
2019            "body_encoding": r.8,
2020            "client_ip": r.9,
2021            "trace_id": r.10,
2022            "span_id": r.11,
2023            "duration_ms": r.12,
2024            "status_code": r.13,
2025            "tags": r.14,
2026        })
2027    }))
2028}
2029
2030/// Get the response body associated with a capture. Recorder splits the
2031/// request and response on separate endpoints so callers can paginate
2032/// requests cheaply without dragging response payloads along.
2033pub async fn get_recorder_capture_response(
2034    State(state): State<AppState>,
2035    AuthUser(user_id): AuthUser,
2036    headers: HeaderMap,
2037    Path((deployment_id, capture_id)): Path<(Uuid, String)>,
2038) -> ApiResult<axum::http::Response<axum::body::Body>> {
2039    let pool = state.db.pool();
2040
2041    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2042        .await
2043        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2044
2045    let deployment = HostedMock::find_by_id(pool, deployment_id)
2046        .await
2047        .map_err(ApiError::Database)?
2048        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2049
2050    if deployment.org_id != org_ctx.org_id {
2051        return Err(ApiError::InvalidRequest(
2052            "You don't have access to this deployment".to_string(),
2053        ));
2054    }
2055
2056    if capture_id.contains('/') || capture_id.contains('?') || capture_id.contains('#') {
2057        return Err(ApiError::InvalidRequest("Invalid capture id".to_string()));
2058    }
2059
2060    if let Some(row) = fetch_cloud_capture_response(pool, deployment_id, &capture_id).await? {
2061        let body = serde_json::to_vec(&row).map_err(|e| {
2062            ApiError::InvalidRequest(format!("Failed to serialize response: {}", e))
2063        })?;
2064        return Ok(axum::http::Response::builder()
2065            .status(axum::http::StatusCode::OK)
2066            .header("content-type", "application/json")
2067            .body(axum::body::Body::from(body))
2068            .unwrap());
2069    }
2070
2071    let path = format!("/api/recorder/requests/{}/response", urlencoding::encode(&capture_id));
2072    let _ = state;
2073    proxy_to_deployment_recorder(&deployment, &path).await
2074}
2075
2076/// Fetch the response side of a capture from cloud Postgres. Returns
2077/// None when the row isn't synced or the response side hasn't been
2078/// recorded yet (request-only exchanges).
2079async fn fetch_cloud_capture_response(
2080    pool: &sqlx::PgPool,
2081    deployment_id: Uuid,
2082    capture_id: &str,
2083) -> ApiResult<Option<serde_json::Value>> {
2084    type Row = (Option<i32>, Option<String>, Option<String>, Option<String>, Option<i64>);
2085    let row: Option<Row> = sqlx::query_as(
2086        r#"
2087        SELECT response_status_code, response_headers, response_body,
2088               response_body_encoding, response_size_bytes
2089        FROM runtime_captures
2090        WHERE deployment_id = $1 AND capture_id = $2
2091        LIMIT 1
2092        "#,
2093    )
2094    .bind(deployment_id)
2095    .bind(capture_id)
2096    .fetch_optional(pool)
2097    .await
2098    .map_err(ApiError::Database)?;
2099    Ok(row.and_then(|r| {
2100        let status_code = r.0?;
2101        Some(serde_json::json!({
2102            "status_code": status_code,
2103            "headers": r.1.unwrap_or_else(|| "{}".to_string()),
2104            "body": r.2,
2105            "body_encoding": r.3.unwrap_or_else(|| "utf8".to_string()),
2106            "size_bytes": r.4.unwrap_or(0),
2107        }))
2108    }))
2109}
2110
2111/// Proxy a POST to the deployment's recorder API. Used by the
2112/// enable/disable/clear mutations below — same auth model as the GET
2113/// proxies (user JWT gates access; the deployment itself has no
2114/// per-user auth on `/api/recorder/*`).
2115async fn proxy_post_to_deployment_recorder(
2116    deployment: &HostedMock,
2117    path: &str,
2118) -> ApiResult<axum::http::Response<axum::body::Body>> {
2119    let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
2120    let Some(base) = base else {
2121        return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
2122    };
2123    let url = format!("{}{}", base.trim_end_matches('/'), path);
2124
2125    let client = reqwest::Client::builder()
2126        .timeout(std::time::Duration::from_secs(10))
2127        .build()
2128        .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
2129
2130    let resp =
2131        client.post(&url).send().await.map_err(|e| {
2132            ApiError::Internal(anyhow::anyhow!("Recorder proxy POST failed: {}", e))
2133        })?;
2134
2135    let status = resp.status();
2136    let headers = resp.headers().clone();
2137    let body = resp.bytes().await.map_err(|e| {
2138        ApiError::Internal(anyhow::anyhow!("Recorder proxy read body failed: {}", e))
2139    })?;
2140
2141    let mut builder = axum::http::Response::builder().status(status);
2142    if let Some(content_type) = headers.get(axum::http::header::CONTENT_TYPE) {
2143        builder = builder.header(axum::http::header::CONTENT_TYPE, content_type);
2144    }
2145    builder.body(axum::body::Body::from(body)).map_err(|e| {
2146        ApiError::Internal(anyhow::anyhow!("Recorder proxy response build failed: {}", e))
2147    })
2148}
2149
2150async fn check_org_access(
2151    state: &AppState,
2152    user_id: Uuid,
2153    headers: &HeaderMap,
2154    deployment_id: Uuid,
2155) -> ApiResult<HostedMock> {
2156    let pool = state.db.pool();
2157
2158    let org_ctx = resolve_org_context(state, user_id, headers, None)
2159        .await
2160        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2161
2162    let deployment = HostedMock::find_by_id(pool, deployment_id)
2163        .await
2164        .map_err(ApiError::Database)?
2165        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2166
2167    if deployment.org_id != org_ctx.org_id {
2168        return Err(ApiError::InvalidRequest(
2169            "You don't have access to this deployment".to_string(),
2170        ));
2171    }
2172    Ok(deployment)
2173}
2174
2175/// Get current recorder enabled state. Proxies GET /api/recorder/status so
2176/// the UI's Captures tab can render the toggle as a real read+write — the
2177/// Enable/Disable buttons used to be fire-and-forget with no read side.
2178pub async fn get_recorder_status(
2179    State(state): State<AppState>,
2180    AuthUser(user_id): AuthUser,
2181    headers: HeaderMap,
2182    Path(deployment_id): Path<Uuid>,
2183) -> ApiResult<axum::http::Response<axum::body::Body>> {
2184    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2185    proxy_to_deployment_recorder(&deployment, "/api/recorder/status").await
2186}
2187
2188/// Enable recording on the deployment. Proxies POST /api/recorder/enable.
2189pub async fn enable_recorder(
2190    State(state): State<AppState>,
2191    AuthUser(user_id): AuthUser,
2192    headers: HeaderMap,
2193    Path(deployment_id): Path<Uuid>,
2194) -> ApiResult<axum::http::Response<axum::body::Body>> {
2195    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2196    proxy_post_to_deployment_recorder(&deployment, "/api/recorder/enable").await
2197}
2198
2199/// Disable recording on the deployment. Proxies POST /api/recorder/disable.
2200pub async fn disable_recorder(
2201    State(state): State<AppState>,
2202    AuthUser(user_id): AuthUser,
2203    headers: HeaderMap,
2204    Path(deployment_id): Path<Uuid>,
2205) -> ApiResult<axum::http::Response<axum::body::Body>> {
2206    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2207    proxy_post_to_deployment_recorder(&deployment, "/api/recorder/disable").await
2208}
2209
2210/// Clear all captures on the deployment. Proxies DELETE /api/recorder/clear.
2211/// Note: the deployment's clear endpoint is DELETE; we POST through to it
2212/// here because typing the verb at the cloud layer doesn't change much
2213/// and POST is friendlier for browser fetch without preflight.
2214pub async fn clear_recorder(
2215    State(state): State<AppState>,
2216    AuthUser(user_id): AuthUser,
2217    headers: HeaderMap,
2218    Path(deployment_id): Path<Uuid>,
2219) -> ApiResult<axum::http::Response<axum::body::Body>> {
2220    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2221    // Build a DELETE on the wire since that's what the recorder defines.
2222    let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
2223    let Some(base) = base else {
2224        return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
2225    };
2226    let url = format!("{}/api/recorder/clear", base.trim_end_matches('/'));
2227    let client = reqwest::Client::builder()
2228        .timeout(std::time::Duration::from_secs(10))
2229        .build()
2230        .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
2231    let resp =
2232        client.delete(&url).send().await.map_err(|e| {
2233            ApiError::Internal(anyhow::anyhow!("Recorder clear proxy failed: {}", e))
2234        })?;
2235    let status = resp.status();
2236    let headers_resp = resp.headers().clone();
2237    let body = resp.bytes().await.map_err(|e| {
2238        ApiError::Internal(anyhow::anyhow!("Recorder clear read body failed: {}", e))
2239    })?;
2240    let mut builder = axum::http::Response::builder().status(status);
2241    if let Some(content_type) = headers_resp.get(axum::http::header::CONTENT_TYPE) {
2242        builder = builder.header(axum::http::header::CONTENT_TYPE, content_type);
2243    }
2244    builder.body(axum::body::Body::from(body)).map_err(|e| {
2245        ApiError::Internal(anyhow::anyhow!("Recorder clear response build failed: {}", e))
2246    })
2247}
2248
2249/// Replay a captured request against the deployment. The recorder
2250/// records the request envelope; replay re-executes it and returns the
2251/// fresh response — useful for "did the bug we captured get fixed yet"
2252/// or "check whether a chaos rule still triggers."
2253pub async fn replay_recorder_capture(
2254    State(state): State<AppState>,
2255    AuthUser(user_id): AuthUser,
2256    headers: HeaderMap,
2257    Path((deployment_id, capture_id)): Path<(Uuid, String)>,
2258) -> ApiResult<axum::http::Response<axum::body::Body>> {
2259    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2260    if capture_id.contains('/') || capture_id.contains('?') || capture_id.contains('#') {
2261        return Err(ApiError::InvalidRequest("Invalid capture id".to_string()));
2262    }
2263    let path = format!("/api/recorder/replay/{}", urlencoding::encode(&capture_id));
2264    proxy_post_to_deployment_recorder(&deployment, &path).await
2265}
2266
2267/// Export the deployment's recorder captures as HAR. Proxies the
2268/// recorder's existing `/api/recorder/export/har` endpoint and forwards
2269/// the response unchanged. The browser handles the download via a blob
2270/// URL on the UI side.
2271pub async fn export_recorder_captures_har(
2272    State(state): State<AppState>,
2273    AuthUser(user_id): AuthUser,
2274    headers: HeaderMap,
2275    Path(deployment_id): Path<Uuid>,
2276) -> ApiResult<axum::http::Response<axum::body::Body>> {
2277    let pool = state.db.pool();
2278
2279    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2280        .await
2281        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2282
2283    let deployment = HostedMock::find_by_id(pool, deployment_id)
2284        .await
2285        .map_err(ApiError::Database)?
2286        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2287
2288    if deployment.org_id != org_ctx.org_id {
2289        return Err(ApiError::InvalidRequest(
2290            "You don't have access to this deployment".to_string(),
2291        ));
2292    }
2293
2294    let _ = state;
2295    proxy_to_deployment_recorder(&deployment, "/api/recorder/export/har").await
2296}
2297
2298/// Export the deployment's recorder captures as JSONL — one
2299/// `RecordedExchange` per line, all protocols. Companion to the HAR
2300/// export. JSONL is the format the local `mockforge-cli replay` reads,
2301/// so this is the round-trip path: capture in cloud → download → replay
2302/// locally.
2303pub async fn export_recorder_captures_jsonl(
2304    State(state): State<AppState>,
2305    AuthUser(user_id): AuthUser,
2306    headers: HeaderMap,
2307    Path(deployment_id): Path<Uuid>,
2308) -> ApiResult<axum::http::Response<axum::body::Body>> {
2309    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2310    proxy_to_deployment_recorder(&deployment, "/api/recorder/export/jsonl").await
2311}
2312
2313/// Get deployment metrics
2314pub async fn get_deployment_metrics(
2315    State(state): State<AppState>,
2316    AuthUser(user_id): AuthUser,
2317    headers: HeaderMap,
2318    Path(deployment_id): Path<Uuid>,
2319) -> ApiResult<Json<MetricsResponse>> {
2320    let pool = state.db.pool();
2321
2322    // Resolve org context
2323    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2324        .await
2325        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2326
2327    // Get deployment
2328    let deployment = HostedMock::find_by_id(pool, deployment_id)
2329        .await
2330        .map_err(ApiError::Database)?
2331        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2332
2333    // Verify access
2334    if deployment.org_id != org_ctx.org_id {
2335        return Err(ApiError::InvalidRequest(
2336            "You don't have access to this deployment".to_string(),
2337        ));
2338    }
2339
2340    // Prefer live metrics from Fly Managed Prometheus when configured.
2341    // Falls back to the local `deployment_metrics` table when:
2342    //   * Fly Prometheus env vars aren't set (dev / self-hosted), or
2343    //   * the query fails (transient network / auth error).
2344    if let Some(client) = crate::fly_metrics::global() {
2345        let app_name = deployment.fly_app_name();
2346        match client.snapshot_for_app(&app_name).await {
2347            Ok(snap) => {
2348                use chrono::Datelike;
2349                let now = Utc::now().date_naive();
2350                let period_start =
2351                    chrono::NaiveDate::from_ymd_opt(now.year(), now.month(), 1).unwrap_or(now);
2352                return Ok(Json(MetricsResponse {
2353                    requests: snap.requests,
2354                    requests_2xx: snap.requests_2xx,
2355                    requests_4xx: snap.requests_4xx,
2356                    requests_5xx: snap.requests_5xx,
2357                    egress_bytes: snap.egress_bytes,
2358                    avg_response_time_ms: snap.avg_response_time_ms,
2359                    period_start,
2360                }));
2361            }
2362            Err(err) => {
2363                tracing::warn!(
2364                    app_name = %app_name,
2365                    error = %err,
2366                    "Fly Prometheus metrics query failed; falling back to local counters"
2367                );
2368            }
2369        }
2370    }
2371
2372    // Fallback: return the local aggregate counters. Until the in-container
2373    // log shipper lands (#232) this table has no writer and returns zeros.
2374    let metrics = DeploymentMetrics::get_or_create_current(pool, deployment_id)
2375        .await
2376        .map_err(ApiError::Database)?;
2377
2378    Ok(Json(MetricsResponse::from(metrics)))
2379}
2380
2381/// Upload an OpenAPI spec file for use in a hosted mock deployment
2382pub async fn upload_spec(
2383    State(state): State<AppState>,
2384    AuthUser(user_id): AuthUser,
2385    headers: HeaderMap,
2386    mut multipart: Multipart,
2387) -> ApiResult<Json<SpecUploadResponse>> {
2388    // Resolve org context
2389    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2390        .await
2391        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2392
2393    // Check permission
2394    let checker = PermissionChecker::new(&state);
2395    checker
2396        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
2397        .await?;
2398
2399    // Extract file from multipart
2400    let mut file_data: Option<Vec<u8>> = None;
2401    let mut file_name = String::from("spec");
2402
2403    while let Some(field) = multipart
2404        .next_field()
2405        .await
2406        .map_err(|e| ApiError::InvalidRequest(format!("Failed to read multipart field: {}", e)))?
2407    {
2408        if field.name() == Some("file") || field.name() == Some("spec") {
2409            if let Some(name) = field.file_name() {
2410                file_name =
2411                    name.to_string().replace(".yaml", "").replace(".yml", "").replace(".json", "");
2412            }
2413            let data = field.bytes().await.map_err(|e| {
2414                ApiError::InvalidRequest(format!("Failed to read file data: {}", e))
2415            })?;
2416
2417            // Validate it's valid JSON or YAML OpenAPI spec
2418            let content = String::from_utf8(data.to_vec()).map_err(|_| {
2419                ApiError::InvalidRequest("File must be valid UTF-8 text".to_string())
2420            })?;
2421
2422            // Try to parse as JSON first, then YAML
2423            let spec_value: serde_json::Value =
2424                if let Ok(v) = serde_json::from_str::<serde_json::Value>(&content) {
2425                    v
2426                } else if let Ok(v) = serde_yaml::from_str::<serde_json::Value>(&content) {
2427                    v
2428                } else {
2429                    return Err(ApiError::InvalidRequest(
2430                        "File must be a valid JSON or YAML OpenAPI specification".to_string(),
2431                    ));
2432                };
2433
2434            // Basic OpenAPI validation - check for required fields
2435            if spec_value.get("openapi").is_none() && spec_value.get("swagger").is_none() {
2436                return Err(ApiError::InvalidRequest(
2437                    "File must contain an 'openapi' or 'swagger' field".to_string(),
2438                ));
2439            }
2440
2441            // Always store as JSON
2442            let json_data = serde_json::to_vec_pretty(&spec_value).map_err(|e| {
2443                ApiError::Internal(anyhow::anyhow!("Failed to serialize spec: {}", e))
2444            })?;
2445
2446            file_data = Some(json_data);
2447        }
2448    }
2449
2450    let data = file_data.ok_or_else(|| {
2451        ApiError::InvalidRequest("No 'file' or 'spec' field in upload".to_string())
2452    })?;
2453
2454    // Upload to storage
2455    let url = state
2456        .storage
2457        .upload_spec(&org_ctx.org_id.to_string(), &file_name, data)
2458        .await
2459        .map_err(|e| ApiError::Internal(anyhow::anyhow!("Failed to upload spec: {}", e)))?;
2460
2461    Ok(Json(SpecUploadResponse { url }))
2462}
2463
2464#[derive(Debug, Serialize)]
2465pub struct SpecUploadResponse {
2466    pub url: String,
2467}
2468
2469// Request/Response types
2470
2471#[derive(Debug, Deserialize)]
2472pub struct CreateDeploymentRequest {
2473    pub name: String,
2474    pub slug: Option<String>,
2475    pub description: Option<String>,
2476    pub project_id: Option<Uuid>,
2477    pub config_json: serde_json::Value,
2478    pub openapi_spec_url: Option<String>,
2479    pub region: Option<String>,
2480    /// Protocols to expose on the deployment. HTTP is implicit and always
2481    /// included. Items beyond Free-tier (gRPC, brokers) require a higher
2482    /// plan and are rejected at create time if the org isn't entitled.
2483    /// Persisted into `config_json["enabled_protocols"]`.
2484    #[serde(default)]
2485    pub enabled_protocols: Option<Vec<crate::models::Protocol>>,
2486    /// Optional upstream URL the deployment proxies to when the reality
2487    /// slider is > 0 (#222). When unset, the slider is a no-op and
2488    /// responses always come from the mock. Persisted into
2489    /// `config_json["upstream_url"]`.
2490    #[serde(default)]
2491    pub upstream_url: Option<String>,
2492}
2493
2494#[derive(Debug, Deserialize)]
2495pub struct UpdateStatusRequest {
2496    pub status: String,
2497    pub error_message: Option<String>,
2498    pub deployment_url: Option<String>,
2499    pub internal_url: Option<String>,
2500}
2501
2502#[derive(Debug, Serialize)]
2503pub struct DeploymentResponse {
2504    pub id: Uuid,
2505    pub org_id: Uuid,
2506    pub project_id: Option<Uuid>,
2507    pub name: String,
2508    pub slug: String,
2509    pub description: Option<String>,
2510    pub status: String,
2511    pub deployment_url: Option<String>,
2512    pub openapi_spec_url: Option<String>,
2513    pub region: String,
2514    pub instance_type: String,
2515    pub health_status: String,
2516    pub error_message: Option<String>,
2517    pub enabled_protocols: Vec<crate::models::Protocol>,
2518    /// Upstream URL the deployment proxies to when the reality slider is > 0.
2519    /// Persisted inside `config_json["upstream_url"]`; surfaced here so the
2520    /// UI can display and (eventually) edit it without reparsing config_json.
2521    pub upstream_url: Option<String>,
2522    pub created_at: DateTime<Utc>,
2523    pub updated_at: DateTime<Utc>,
2524}
2525
2526impl From<HostedMock> for DeploymentResponse {
2527    fn from(mock: HostedMock) -> Self {
2528        let status = mock.status().to_string();
2529        let health_status = mock.health_status().to_string();
2530        let enabled_protocols = mock.enabled_protocols();
2531        let upstream_url = mock.upstream_url();
2532        Self {
2533            id: mock.id,
2534            org_id: mock.org_id,
2535            project_id: mock.project_id,
2536            name: mock.name,
2537            slug: mock.slug,
2538            description: mock.description,
2539            status,
2540            deployment_url: mock.deployment_url,
2541            openapi_spec_url: mock.openapi_spec_url,
2542            region: mock.region,
2543            instance_type: mock.instance_type,
2544            health_status,
2545            error_message: mock.error_message,
2546            enabled_protocols,
2547            upstream_url,
2548            created_at: mock.created_at,
2549            updated_at: mock.updated_at,
2550        }
2551    }
2552}
2553
2554#[derive(Debug, Serialize)]
2555pub struct LogResponse {
2556    pub id: Uuid,
2557    pub level: String,
2558    pub message: String,
2559    pub metadata: serde_json::Value,
2560    pub created_at: DateTime<Utc>,
2561}
2562
2563impl From<DeploymentLog> for LogResponse {
2564    fn from(log: DeploymentLog) -> Self {
2565        Self {
2566            id: log.id,
2567            level: log.level,
2568            message: log.message,
2569            metadata: log.metadata_json,
2570            created_at: log.created_at,
2571        }
2572    }
2573}
2574
2575#[derive(Debug, Serialize)]
2576pub struct MetricsResponse {
2577    pub requests: i64,
2578    pub requests_2xx: i64,
2579    pub requests_4xx: i64,
2580    pub requests_5xx: i64,
2581    pub egress_bytes: i64,
2582    pub avg_response_time_ms: i64,
2583    pub period_start: chrono::NaiveDate,
2584}
2585
2586impl From<DeploymentMetrics> for MetricsResponse {
2587    fn from(metrics: DeploymentMetrics) -> Self {
2588        Self {
2589            requests: metrics.requests,
2590            requests_2xx: metrics.requests_2xx,
2591            requests_4xx: metrics.requests_4xx,
2592            requests_5xx: metrics.requests_5xx,
2593            egress_bytes: metrics.egress_bytes,
2594            avg_response_time_ms: metrics.avg_response_time_ms,
2595            period_start: metrics.period_start,
2596        }
2597    }
2598}
2599
2600#[derive(Debug, Deserialize)]
2601pub struct SetDomainRequest {
2602    pub domain: String,
2603}
2604
2605/// Set a custom domain for a deployment.
2606///
2607/// Adds a TLS certificate on the registry server Fly.io app so that
2608/// `<slug>.<domain>` terminates TLS here, then the proxy fallback
2609/// handler forwards traffic to the deployment's internal URL.
2610pub async fn set_domain(
2611    State(state): State<AppState>,
2612    AuthUser(user_id): AuthUser,
2613    headers: HeaderMap,
2614    Path(deployment_id): Path<Uuid>,
2615    Json(request): Json<SetDomainRequest>,
2616) -> ApiResult<Json<serde_json::Value>> {
2617    let pool = state.db.pool();
2618
2619    // Resolve org context
2620    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2621        .await
2622        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2623
2624    // Check permission
2625    let checker = PermissionChecker::new(&state);
2626    checker
2627        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
2628        .await?;
2629
2630    // Get deployment
2631    let deployment = HostedMock::find_by_id(pool, deployment_id)
2632        .await
2633        .map_err(ApiError::Database)?
2634        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2635
2636    // Verify ownership
2637    if deployment.org_id != org_ctx.org_id {
2638        return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
2639    }
2640
2641    let hostname = format!("{}.{}", deployment.slug, request.domain);
2642
2643    // Update deployment URL to use the custom domain. A wildcard TLS cert
2644    // on the registry app covers all subdomains, so no per-deployment
2645    // certificate management is needed. Persist `custom_domain` in
2646    // metadata_json so we have an authoritative read-side for the lifecycle
2647    // endpoints below — `deployment_url` alone is ambiguous (it could be a
2648    // MOCKFORGE_MOCKS_DOMAIN-based default).
2649    let new_url = format!("https://{}", hostname);
2650    sqlx::query(
2651        r#"
2652        UPDATE hosted_mocks
2653        SET deployment_url = $1,
2654            metadata_json = jsonb_set(
2655                COALESCE(metadata_json, '{}'::jsonb),
2656                '{custom_domain}',
2657                to_jsonb($2::text)
2658            ),
2659            updated_at = NOW()
2660        WHERE id = $3
2661        "#,
2662    )
2663    .bind(&new_url)
2664    .bind(&hostname)
2665    .bind(deployment_id)
2666    .execute(pool)
2667    .await
2668    .map_err(|e| ApiError::Internal(anyhow::anyhow!("Failed to update deployment URL: {}", e)))?;
2669
2670    DeploymentLog::create(
2671        pool,
2672        deployment_id,
2673        "info",
2674        &format!("Custom domain set: {}", hostname),
2675        None,
2676    )
2677    .await
2678    .ok();
2679
2680    Ok(Json(serde_json::json!({
2681        "hostname": hostname,
2682        "deployment_url": new_url,
2683    })))
2684}
2685
2686/// Read the currently bound custom domain for a deployment, if any. Returns
2687/// `{ "hostname": null }` when the deployment is on its default URL.
2688pub async fn get_custom_domain(
2689    State(state): State<AppState>,
2690    AuthUser(user_id): AuthUser,
2691    headers: HeaderMap,
2692    Path(deployment_id): Path<Uuid>,
2693) -> ApiResult<Json<serde_json::Value>> {
2694    let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2695    let hostname = deployment
2696        .metadata_json
2697        .get("custom_domain")
2698        .and_then(|v| v.as_str())
2699        .map(|s| s.to_string());
2700    Ok(Json(serde_json::json!({
2701        "hostname": hostname,
2702        "deployment_url": deployment.deployment_url,
2703    })))
2704}
2705
2706/// Remove the custom domain mapping. Reverts `deployment_url` to the
2707/// MOCKFORGE_MOCKS_DOMAIN-based default if configured, or the Fly.io
2708/// default `https://<fly_app>.fly.dev` otherwise. The wildcard TLS cert
2709/// on the registry app stays — there is no per-deployment cert to clean
2710/// up because the set path never created one.
2711pub async fn clear_custom_domain(
2712    State(state): State<AppState>,
2713    AuthUser(user_id): AuthUser,
2714    headers: HeaderMap,
2715    Path(deployment_id): Path<Uuid>,
2716) -> ApiResult<Json<serde_json::Value>> {
2717    let pool = state.db.pool();
2718
2719    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2720        .await
2721        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2722
2723    let checker = PermissionChecker::new(&state);
2724    checker
2725        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
2726        .await?;
2727
2728    let deployment = HostedMock::find_by_id(pool, deployment_id)
2729        .await
2730        .map_err(ApiError::Database)?
2731        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2732    if deployment.org_id != org_ctx.org_id {
2733        return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
2734    }
2735
2736    let app_name = deployment.fly_app_name();
2737    let default_url = if let Ok(domain) = std::env::var("MOCKFORGE_MOCKS_DOMAIN") {
2738        format!("https://{}.{}", deployment.slug, domain)
2739    } else {
2740        format!("https://{}.fly.dev", app_name)
2741    };
2742
2743    sqlx::query(
2744        r#"
2745        UPDATE hosted_mocks
2746        SET deployment_url = $1,
2747            metadata_json = COALESCE(metadata_json, '{}'::jsonb) - 'custom_domain',
2748            updated_at = NOW()
2749        WHERE id = $2
2750        "#,
2751    )
2752    .bind(&default_url)
2753    .bind(deployment_id)
2754    .execute(pool)
2755    .await
2756    .map_err(|e| ApiError::Internal(anyhow::anyhow!("Failed to clear custom domain: {}", e)))?;
2757
2758    DeploymentLog::create(pool, deployment_id, "info", "Custom domain removed", None)
2759        .await
2760        .ok();
2761
2762    Ok(Json(serde_json::json!({
2763        "hostname": serde_json::Value::Null,
2764        "deployment_url": default_url,
2765    })))
2766}
2767
2768// ─── Smoke-test trigger (Issue #392) ─────────────────────────────────
2769
2770/// Optional overrides on the smoke run. Both fields fall back to the
2771/// runner's executor defaults when omitted (5s latency budget, GET-only).
2772#[derive(Debug, Default, Deserialize)]
2773#[serde(default, rename_all = "camelCase")]
2774pub struct TriggerSmokeRunRequest {
2775    /// Per-route latency assertion ceiling, in milliseconds.
2776    pub latency_budget_ms: Option<u64>,
2777    /// HTTP methods to probe. Defaults to `["GET"]` at the executor.
2778    /// Currently only GET has been thought through — POST/PUT/PATCH need
2779    /// a body source which v1 doesn't have.
2780    pub methods: Option<Vec<String>>,
2781}
2782
2783/// `POST /api/v1/hosted-mocks/{deployment_id}/smoke-runs`
2784///
2785/// Triggers a smoke test against a hosted-mock deployment. Reuses the
2786/// existing `test_runs` lifecycle with `kind = "smoke"` so smokes share
2787/// the runner pool, concurrency cap, and runner_seconds metering with
2788/// every other run kind. The runner-side `SmokeTestExecutor` (see
2789/// `crates/mockforge-test-runner/src/executors/smoke.rs`) walks the
2790/// deployment's OpenAPI spec, probes each declared route against the
2791/// deployment's public URL, and reports `route_pass` / `route_fail`
2792/// events back via the internal callbacks.
2793///
2794/// Authorization: caller must hold `Permission::HostedMockUpdate` on
2795/// the deployment's org. Cross-org access surfaces as
2796/// "Deployment not found" rather than "forbidden" (matches the
2797/// convention in `delete_deployment`).
2798///
2799/// Failure modes (all 400 InvalidRequest):
2800///   - Deployment not found / not in caller's org.
2801///   - Deployment is not in `running` status — running smoke against a
2802///     deployment that's still provisioning or has crashed gives
2803///     misleading red routes that aren't actually regressions.
2804///   - Deployment has no `deployment_url` (still being provisioned).
2805///   - Deployment has no `openapi_spec_url` (no spec uploaded yet).
2806pub async fn trigger_smoke_run(
2807    State(state): State<AppState>,
2808    AuthUser(user_id): AuthUser,
2809    Path(deployment_id): Path<Uuid>,
2810    headers: HeaderMap,
2811    body: Option<Json<TriggerSmokeRunRequest>>,
2812) -> ApiResult<Json<TestRun>> {
2813    let req = body.map(|Json(r)| r).unwrap_or_default();
2814
2815    // ─── Auth + deployment lookup ────────────────────────────────
2816    let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2817        .await
2818        .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2819
2820    let checker = PermissionChecker::new(&state);
2821    checker
2822        .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
2823        .await?;
2824
2825    let deployment = state
2826        .store
2827        .find_hosted_mock_by_id(deployment_id)
2828        .await?
2829        .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2830    if deployment.org_id != org_ctx.org_id {
2831        // Hide existence from non-members of the deployment's org.
2832        return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
2833    }
2834    if deployment.status != "active" {
2835        // `active` is the canonical "deployment is up + serving" status
2836        // (see `DeploymentStatus::Active` in mockforge-registry-core).
2837        // Anything else (`pending`, `deploying`, `stopped`, `failed`,
2838        // `deleting`) means we'd be probing a deployment that isn't
2839        // actually serving traffic — every route would surface as a
2840        // misleading red.
2841        return Err(ApiError::InvalidRequest(format!(
2842            "Deployment is in '{}' status; smoke runs require 'active'",
2843            deployment.status,
2844        )));
2845    }
2846
2847    // ─── Plan-limit gate (shared with every other test_run kind) ──
2848    let limits = crate::handlers::usage::effective_limits(&state, &org_ctx.org).await?;
2849    let max_concurrent = limits.get("max_concurrent_runs").and_then(|v| v.as_i64()).unwrap_or(0);
2850    if max_concurrent == 0 {
2851        return Err(ApiError::ResourceLimitExceeded(
2852            "Test execution is not enabled on this plan".into(),
2853        ));
2854    }
2855    if max_concurrent > 0 {
2856        let inflight = TestRun::count_inflight(state.db.pool(), org_ctx.org_id)
2857            .await
2858            .map_err(ApiError::Database)?;
2859        if inflight.total() >= max_concurrent {
2860            return Err(ApiError::ResourceLimitExceeded(format!(
2861                "Concurrent run limit reached ({}/{}).",
2862                inflight.total(),
2863                max_concurrent,
2864            )));
2865        }
2866    }
2867
2868    // ─── Build the runner payload ────────────────────────────────
2869    let payload = build_smoke_payload(&deployment, &req)?;
2870
2871    // ─── Enqueue test_runs row ───────────────────────────────────
2872    let run = TestRun::enqueue(
2873        state.db.pool(),
2874        EnqueueTestRun {
2875            suite_id: deployment.id,
2876            org_id: org_ctx.org_id,
2877            kind: "smoke",
2878            triggered_by: "manual",
2879            triggered_by_user: Some(user_id),
2880            git_ref: None,
2881            git_sha: None,
2882        },
2883    )
2884    .await
2885    .map_err(ApiError::Database)?;
2886
2887    // ─── Push onto the Redis queue for the runner ────────────────
2888    if let Err(e) = crate::run_queue::enqueue(
2889        state.redis.as_ref(),
2890        crate::run_queue::EnqueuedJob {
2891            run_id: run.id,
2892            org_id: run.org_id,
2893            source_id: deployment.id,
2894            kind: "smoke",
2895            payload,
2896        },
2897    )
2898    .await
2899    {
2900        // Match the chaos handler's behaviour: log the failure but still
2901        // return the queued row. The runner will pick it up when the
2902        // queue is healthy again, and the test_runs status reflects that
2903        // it never left 'queued' in the meantime.
2904        tracing::error!(run_id = %run.id, error = %e, "failed to enqueue smoke run");
2905    }
2906
2907    Ok(Json(run))
2908}
2909
2910/// Build the JSON payload the runner's `SmokeTestExecutor` consumes.
2911/// Pre-flight validates that the deployment has the URLs the executor
2912/// needs (base + spec) so a "missing field" doesn't surface as a
2913/// runner-side `errored` run after queueing.
2914fn build_smoke_payload(
2915    deployment: &HostedMock,
2916    req: &TriggerSmokeRunRequest,
2917) -> ApiResult<serde_json::Value> {
2918    let base_url =
2919        deployment.deployment_url.as_deref().filter(|s| !s.is_empty()).ok_or_else(|| {
2920            ApiError::InvalidRequest(
2921                "Deployment has no public URL — wait for the deploy to finish before running smoke"
2922                    .to_string(),
2923            )
2924        })?;
2925    let spec_url =
2926        deployment
2927            .openapi_spec_url
2928            .as_deref()
2929            .filter(|s| !s.is_empty())
2930            .ok_or_else(|| {
2931                ApiError::InvalidRequest(
2932                    "Deployment has no OpenAPI spec — upload one before running smoke".to_string(),
2933                )
2934            })?;
2935
2936    let mut payload = serde_json::json!({
2937        "deployment_id": deployment.id,
2938        "base_url": base_url,
2939        "openapi_spec_url": spec_url,
2940    });
2941    let obj = payload
2942        .as_object_mut()
2943        .expect("payload was constructed as an object on the line above");
2944
2945    if let Some(budget) = req.latency_budget_ms {
2946        obj.insert("latency_budget_ms".into(), budget.into());
2947    }
2948    if let Some(methods) = req.methods.as_ref() {
2949        // `to_value` on a Vec<String> is infallible in practice, but
2950        // bail with a clean error rather than panic if it ever isn't.
2951        let v = serde_json::to_value(methods)
2952            .map_err(|e| ApiError::InvalidRequest(format!("invalid methods array: {e}")))?;
2953        obj.insert("methods".into(), v);
2954    }
2955
2956    Ok(payload)
2957}
2958
2959#[cfg(test)]
2960mod smoke_trigger_tests {
2961    use super::*;
2962
2963    fn deployment_with(
2964        status: &str,
2965        deployment_url: Option<&str>,
2966        spec_url: Option<&str>,
2967    ) -> HostedMock {
2968        // Construct the full struct rather than `..Default::default()`:
2969        // HostedMock doesn't impl Default, and listing every field
2970        // explicitly means a future schema column addition triggers a
2971        // compile error here so the test gets a chance to opt in.
2972        HostedMock {
2973            id: Uuid::new_v4(),
2974            org_id: Uuid::new_v4(),
2975            project_id: None,
2976            name: "test".to_string(),
2977            slug: "test".to_string(),
2978            description: None,
2979            config_json: serde_json::json!({}),
2980            openapi_spec_url: spec_url.map(String::from),
2981            status: status.to_string(),
2982            deployment_url: deployment_url.map(String::from),
2983            internal_url: None,
2984            region: "iad".to_string(),
2985            instance_type: "shared-cpu-1x".to_string(),
2986            health_check_url: None,
2987            last_health_check: None,
2988            health_status: "unknown".to_string(),
2989            error_message: None,
2990            metadata_json: serde_json::json!({}),
2991            created_at: Utc::now(),
2992            updated_at: Utc::now(),
2993            deleted_at: None,
2994        }
2995    }
2996
2997    #[test]
2998    fn build_smoke_payload_uses_deployment_urls() {
2999        let dep = deployment_with(
3000            "running",
3001            Some("https://my-mock.fly.dev"),
3002            Some("https://specs.example.com/abc.json"),
3003        );
3004        let req = TriggerSmokeRunRequest::default();
3005        let payload = build_smoke_payload(&dep, &req).unwrap();
3006        assert_eq!(payload["base_url"], "https://my-mock.fly.dev");
3007        assert_eq!(payload["openapi_spec_url"], "https://specs.example.com/abc.json");
3008        assert_eq!(payload["deployment_id"], serde_json::json!(dep.id));
3009        assert!(payload.get("latency_budget_ms").is_none());
3010        assert!(payload.get("methods").is_none());
3011    }
3012
3013    #[test]
3014    fn build_smoke_payload_passes_overrides() {
3015        let dep = deployment_with("running", Some("https://x"), Some("https://y"));
3016        let req = TriggerSmokeRunRequest {
3017            latency_budget_ms: Some(2000),
3018            methods: Some(vec!["GET".into(), "HEAD".into()]),
3019        };
3020        let payload = build_smoke_payload(&dep, &req).unwrap();
3021        assert_eq!(payload["latency_budget_ms"], 2000);
3022        assert_eq!(payload["methods"], serde_json::json!(["GET", "HEAD"]));
3023    }
3024
3025    #[test]
3026    fn build_smoke_payload_rejects_missing_deployment_url() {
3027        let dep = deployment_with("running", None, Some("https://y"));
3028        let err = build_smoke_payload(&dep, &TriggerSmokeRunRequest::default()).unwrap_err();
3029        match err {
3030            ApiError::InvalidRequest(msg) => assert!(msg.contains("public URL")),
3031            other => panic!("expected InvalidRequest, got {other:?}"),
3032        }
3033    }
3034
3035    #[test]
3036    fn build_smoke_payload_rejects_empty_deployment_url() {
3037        let dep = deployment_with("running", Some(""), Some("https://y"));
3038        assert!(matches!(
3039            build_smoke_payload(&dep, &TriggerSmokeRunRequest::default()),
3040            Err(ApiError::InvalidRequest(_))
3041        ));
3042    }
3043
3044    #[test]
3045    fn build_smoke_payload_rejects_missing_spec_url() {
3046        let dep = deployment_with("running", Some("https://x"), None);
3047        let err = build_smoke_payload(&dep, &TriggerSmokeRunRequest::default()).unwrap_err();
3048        match err {
3049            ApiError::InvalidRequest(msg) => assert!(msg.contains("OpenAPI spec")),
3050            other => panic!("expected InvalidRequest, got {other:?}"),
3051        }
3052    }
3053}