1use axum::{
6 extract::{Multipart, Path, Query, State},
7 http::HeaderMap,
8 response::sse::{Event, KeepAlive, Sse},
9 Json,
10};
11use chrono::{DateTime, Utc};
12use futures_util::stream::Stream;
13use serde::{Deserialize, Serialize};
14use std::convert::Infallible;
15use uuid::Uuid;
16
17use crate::{
18 deployment::flyio::FlyioClient,
19 error::{ApiError, ApiResult},
20 fly_logs::LogEntry,
21 middleware::{
22 permission_check::PermissionChecker, permissions::Permission, resolve_org_context, AuthUser,
23 },
24 models::{
25 feature_usage::FeatureType, AuditEventType, DeploymentLog, DeploymentMetrics,
26 DeploymentStatus, HostedMock, Subscription, SubscriptionStatus, TestRun,
27 },
28 AppState,
29};
30use mockforge_registry_core::models::test_run::EnqueueTestRun;
31use tracing::warn;
32
33pub async fn create_deployment(
35 State(state): State<AppState>,
36 AuthUser(user_id): AuthUser,
37 headers: HeaderMap,
38 Json(request): Json<CreateDeploymentRequest>,
39) -> ApiResult<Json<DeploymentResponse>> {
40 let pool = state.db.pool();
41
42 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
44 .await
45 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
46
47 let checker = PermissionChecker::new(&state);
49 checker
50 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
51 .await?;
52
53 if let Some(subscription) = Subscription::find_by_org(pool, org_ctx.org_id).await? {
71 if subscription.status() == SubscriptionStatus::PastDue {
72 const PAST_DUE_GRACE_SECONDS: i64 = 24 * 60 * 60;
73 let elapsed = (Utc::now() - subscription.updated_at).num_seconds();
74 if elapsed > PAST_DUE_GRACE_SECONDS {
75 return Err(ApiError::PaymentRequired(
78 "Subscription has been past due for over 24 hours. Please update your payment method in the billing portal before deploying new mocks.".to_string(),
79 ));
80 }
81 tracing::info!(
82 org_id = %org_ctx.org_id,
83 past_due_seconds = elapsed,
84 "past_due subscription within 24h grace; allowing deploy"
85 );
86 }
87 }
88
89 let limits = &org_ctx.org.limits_json;
91 let max_hosted_mocks = limits.get("max_hosted_mocks").and_then(|v| v.as_i64()).unwrap_or(0);
92
93 if max_hosted_mocks >= 0 {
94 let existing = state.store.list_hosted_mocks_by_org(org_ctx.org_id).await?;
96
97 let active_count = existing
98 .iter()
99 .filter(|m| {
100 matches!(m.status(), DeploymentStatus::Active | DeploymentStatus::Deploying)
101 })
102 .count();
103
104 if active_count as i64 >= max_hosted_mocks {
105 return Err(ApiError::InvalidRequest(format!(
106 "Hosted mocks limit exceeded. Your plan allows {} active deployments. Upgrade to deploy more.",
107 max_hosted_mocks
108 )));
109 }
110 }
111
112 let generated_slug;
114 let slug = match request.slug.as_deref() {
115 Some(s) => s,
116 None => {
117 generated_slug = request
118 .name
119 .to_lowercase()
120 .chars()
121 .map(|c| {
122 if c.is_alphanumeric() || c == '-' {
123 c
124 } else {
125 '-'
126 }
127 })
128 .collect::<String>()
129 .trim_matches('-')
130 .replace("--", "-");
131 &generated_slug
132 }
133 };
134
135 if !slug.chars().all(|c| c.is_alphanumeric() || c == '-') {
136 return Err(ApiError::InvalidRequest(
137 "Slug must contain only alphanumeric characters and hyphens".to_string(),
138 ));
139 }
140
141 if state.store.find_hosted_mock_by_slug(org_ctx.org_id, slug).await?.is_some() {
143 return Err(ApiError::InvalidRequest(format!(
144 "A deployment with slug '{}' already exists",
145 slug
146 )));
147 }
148
149 let mut enabled_protocols = request.enabled_protocols.clone().unwrap_or_default();
154 if !enabled_protocols.contains(&crate::models::Protocol::Http) {
155 enabled_protocols.insert(0, crate::models::Protocol::Http);
156 }
157 let plan = org_ctx.org.plan.clone();
158 if !crate::models::protocols_allowed_on_plan(&enabled_protocols, &plan) {
159 let blocked: Vec<String> = enabled_protocols
160 .iter()
161 .filter(|p| !crate::models::protocols_allowed_on_plan(&[**p], &plan))
162 .map(|p| format!("{:?}", p))
163 .collect();
164 return Err(ApiError::InvalidRequest(format!(
165 "These protocols require a higher plan than '{}': {}",
166 plan,
167 blocked.join(", ")
168 )));
169 }
170
171 let mut config_json = request.config_json.clone();
175 if !config_json.is_object() {
176 config_json = serde_json::json!({});
177 }
178 if let Some(obj) = config_json.as_object_mut() {
179 obj.insert(
180 "enabled_protocols".to_string(),
181 serde_json::to_value(&enabled_protocols).unwrap_or(serde_json::Value::Null),
182 );
183 if let Some(upstream) = request.upstream_url.as_ref() {
184 let trimmed = upstream.trim();
185 if !trimmed.is_empty() {
186 obj.insert(
187 "upstream_url".to_string(),
188 serde_json::Value::String(trimmed.to_string()),
189 );
190 }
191 }
192 }
193
194 let deployment = state
196 .store
197 .create_hosted_mock(
198 org_ctx.org_id,
199 request.project_id,
200 &request.name,
201 slug,
202 request.description.as_deref(),
203 config_json,
204 request.openapi_spec_url.as_deref(),
205 request.region.as_deref(),
206 )
207 .await?;
208
209 DeploymentLog::create(
211 pool,
212 deployment.id,
213 "info",
214 "Deployment created",
215 Some(serde_json::json!({
216 "name": request.name,
217 "slug": slug,
218 })),
219 )
220 .await
221 .map_err(ApiError::Database)?;
222
223 state
226 .store
227 .update_hosted_mock_status(deployment.id, DeploymentStatus::Pending, None)
228 .await?;
229
230 state
232 .store
233 .record_feature_usage(
234 org_ctx.org_id,
235 Some(user_id),
236 FeatureType::HostedMockDeploy,
237 Some(serde_json::json!({
238 "deployment_id": deployment.id,
239 "name": request.name,
240 "slug": slug,
241 })),
242 )
243 .await;
244
245 let ip_address = headers
247 .get("X-Forwarded-For")
248 .or_else(|| headers.get("X-Real-IP"))
249 .and_then(|h| h.to_str().ok())
250 .map(|s| s.split(',').next().unwrap_or(s).trim());
251 let user_agent = headers.get("User-Agent").and_then(|h| h.to_str().ok());
252
253 state
254 .store
255 .record_audit_event(
256 org_ctx.org_id,
257 Some(user_id),
258 AuditEventType::DeploymentCreated,
259 format!("Hosted mock deployment '{}' created", request.name),
260 Some(serde_json::json!({
261 "deployment_id": deployment.id,
262 "name": request.name,
263 "slug": slug,
264 "project_id": request.project_id,
265 })),
266 ip_address,
267 user_agent,
268 )
269 .await;
270
271 let deployment = state.store.find_hosted_mock_by_id(deployment.id).await?.ok_or_else(|| {
273 ApiError::Internal(anyhow::anyhow!("Failed to retrieve created deployment"))
274 })?;
275
276 Ok(Json(DeploymentResponse::from(deployment)))
277}
278
279pub async fn list_deployments(
281 State(state): State<AppState>,
282 AuthUser(user_id): AuthUser,
283 headers: HeaderMap,
284) -> ApiResult<Json<Vec<DeploymentResponse>>> {
285 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
287 .await
288 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
289
290 let deployments = state.store.list_hosted_mocks_by_org(org_ctx.org_id).await?;
292
293 let responses: Vec<DeploymentResponse> =
294 deployments.into_iter().map(DeploymentResponse::from).collect();
295
296 Ok(Json(responses))
297}
298
299pub async fn get_deployment(
301 State(state): State<AppState>,
302 AuthUser(user_id): AuthUser,
303 headers: HeaderMap,
304 Path(deployment_id): Path<Uuid>,
305) -> ApiResult<Json<DeploymentResponse>> {
306 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
308 .await
309 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
310
311 let deployment = state
313 .store
314 .find_hosted_mock_by_id(deployment_id)
315 .await?
316 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
317
318 if deployment.org_id != org_ctx.org_id {
320 return Err(ApiError::InvalidRequest(
321 "You don't have access to this deployment".to_string(),
322 ));
323 }
324
325 Ok(Json(DeploymentResponse::from(deployment)))
326}
327
328pub async fn update_deployment_status(
330 State(state): State<AppState>,
331 AuthUser(user_id): AuthUser,
332 headers: HeaderMap,
333 Path(deployment_id): Path<Uuid>,
334 Json(request): Json<UpdateStatusRequest>,
335) -> ApiResult<Json<DeploymentResponse>> {
336 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
338 .await
339 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
340
341 let checker = PermissionChecker::new(&state);
343 checker
344 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
345 .await?;
346
347 let deployment = state
349 .store
350 .find_hosted_mock_by_id(deployment_id)
351 .await?
352 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
353
354 if deployment.org_id != org_ctx.org_id {
356 return Err(ApiError::InvalidRequest(
357 "You don't have access to this deployment".to_string(),
358 ));
359 }
360
361 let status = DeploymentStatus::from_str(&request.status)
363 .ok_or_else(|| ApiError::InvalidRequest("Invalid status".to_string()))?;
364
365 state
366 .store
367 .update_hosted_mock_status(deployment_id, status, request.error_message.as_deref())
368 .await?;
369
370 if request.deployment_url.is_some() || request.internal_url.is_some() {
372 state
373 .store
374 .update_hosted_mock_urls(
375 deployment_id,
376 request.deployment_url.as_deref(),
377 request.internal_url.as_deref(),
378 )
379 .await?;
380 }
381
382 let deployment = state.store.find_hosted_mock_by_id(deployment_id).await?.ok_or_else(|| {
384 ApiError::Internal(anyhow::anyhow!("Failed to retrieve updated deployment"))
385 })?;
386
387 if let Ok(Some(org)) = state.store.find_organization_by_id(deployment.org_id).await {
389 if let Ok(Some(owner)) = state.store.find_user_by_id(org.owner_id).await {
390 let status_str = format!("{:?}", deployment.status()).to_lowercase();
391 let email_msg = crate::email::EmailService::generate_deployment_status_email(
392 &owner.username,
393 &owner.email,
394 &deployment.name,
395 &status_str,
396 deployment.deployment_url.as_deref(),
397 request.error_message.as_deref(),
398 );
399
400 tokio::spawn(async move {
401 match crate::email::EmailService::from_env() {
402 Ok(email_service) => {
403 if let Err(e) = email_service.send(email_msg).await {
404 tracing::warn!("Failed to send deployment status email: {}", e);
405 }
406 }
407 Err(e) => {
408 tracing::warn!("Failed to create email service: {}", e);
409 }
410 }
411 });
412 }
413 }
414
415 Ok(Json(DeploymentResponse::from(deployment)))
416}
417
418pub async fn delete_deployment(
420 State(state): State<AppState>,
421 AuthUser(user_id): AuthUser,
422 headers: HeaderMap,
423 Path(deployment_id): Path<Uuid>,
424) -> ApiResult<Json<serde_json::Value>> {
425 let pool = state.db.pool();
426
427 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
429 .await
430 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
431
432 let checker = PermissionChecker::new(&state);
434 checker
435 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockDelete)
436 .await?;
437
438 let deployment = state
440 .store
441 .find_hosted_mock_by_id(deployment_id)
442 .await?
443 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
444
445 if deployment.org_id != org_ctx.org_id {
447 return Err(ApiError::InvalidRequest(
448 "You don't have access to this deployment".to_string(),
449 ));
450 }
451
452 let ip_address = headers
454 .get("X-Forwarded-For")
455 .or_else(|| headers.get("X-Real-IP"))
456 .and_then(|h| h.to_str().ok())
457 .map(|s| s.split(',').next().unwrap_or(s).trim());
458 let user_agent = headers.get("User-Agent").and_then(|h| h.to_str().ok());
459
460 state
461 .store
462 .record_audit_event(
463 org_ctx.org_id,
464 Some(user_id),
465 AuditEventType::DeploymentDeleted,
466 format!("Hosted mock deployment '{}' deleted", deployment.name),
467 Some(serde_json::json!({
468 "deployment_id": deployment.id,
469 "name": deployment.name,
470 "slug": deployment.slug,
471 })),
472 ip_address,
473 user_agent,
474 )
475 .await;
476
477 state
479 .store
480 .update_hosted_mock_status(deployment_id, DeploymentStatus::Deleting, None)
481 .await?;
482
483 if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
486 let flyio_client = FlyioClient::new(flyio_token);
487
488 let app_name = format!(
490 "mockforge-{}-{}",
491 deployment
492 .org_id
493 .to_string()
494 .replace("-", "")
495 .chars()
496 .take(8)
497 .collect::<String>(),
498 deployment.slug
499 );
500
501 let machine_id = deployment.metadata_json.get("flyio_machine_id").and_then(|v| v.as_str());
503
504 if let Some(machine_id) = machine_id {
505 match flyio_client.delete_machine(&app_name, machine_id).await {
507 Ok(_) => {
508 DeploymentLog::create(
509 pool,
510 deployment_id,
511 "info",
512 &format!("Deleted Fly.io machine: {}", machine_id),
513 None,
514 )
515 .await
516 .ok();
517 }
518 Err(e) => {
519 warn!("Failed to delete Fly.io machine {}: {}", machine_id, e);
520 DeploymentLog::create(
521 pool,
522 deployment_id,
523 "warning",
524 &format!("Failed to delete Fly.io machine: {}", e),
525 None,
526 )
527 .await
528 .ok();
529 }
531 }
532 } else {
533 warn!(
535 "Machine ID not found in metadata for deployment {}, attempting to list machines",
536 deployment_id
537 );
538 match flyio_client.list_machines(&app_name).await {
539 Ok(machines) => {
540 for machine in machines {
541 if let Err(e) = flyio_client.delete_machine(&app_name, &machine.id).await {
542 warn!("Failed to delete Fly.io machine {}: {}", machine.id, e);
543 } else {
544 DeploymentLog::create(
545 pool,
546 deployment_id,
547 "info",
548 &format!("Deleted Fly.io machine: {}", machine.id),
549 None,
550 )
551 .await
552 .ok();
553 }
554 }
555 }
556 Err(e) => {
557 warn!("Failed to list Fly.io machines for app {}: {}", app_name, e);
558 }
560 }
561 }
562
563 match flyio_client.delete_app(&app_name).await {
565 Ok(_) => {
566 DeploymentLog::create(
567 pool,
568 deployment_id,
569 "info",
570 &format!("Deleted Fly.io app: {}", app_name),
571 None,
572 )
573 .await
574 .ok();
575 }
576 Err(e) => {
577 warn!("Failed to delete Fly.io app {}: {}", app_name, e);
578 }
579 }
580 }
581
582 state.store.delete_hosted_mock(deployment_id).await?;
584
585 DeploymentLog::create(pool, deployment_id, "info", "Deployment deleted successfully", None)
586 .await
587 .ok(); Ok(Json(serde_json::json!({
590 "success": true,
591 "message": "Deployment deleted"
592 })))
593}
594
595#[derive(Debug, Deserialize, Default)]
597pub struct RedeployRequest {
598 pub config_json: Option<serde_json::Value>,
600 pub openapi_spec_url: Option<String>,
602}
603
604pub async fn redeploy_deployment(
608 State(state): State<AppState>,
609 AuthUser(user_id): AuthUser,
610 headers: HeaderMap,
611 Path(deployment_id): Path<Uuid>,
612 body: Option<Json<RedeployRequest>>,
613) -> ApiResult<Json<serde_json::Value>> {
614 let pool = state.db.pool();
615
616 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
618 .await
619 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
620
621 let checker = PermissionChecker::new(&state);
623 checker
624 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
625 .await?;
626 let deployment = state
630 .store
631 .find_hosted_mock_by_id(deployment_id)
632 .await?
633 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
634
635 if deployment.org_id != org_ctx.org_id {
637 return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
638 }
639
640 let status = deployment.status();
642 if !matches!(status, DeploymentStatus::Active | DeploymentStatus::Failed) {
643 return Err(ApiError::InvalidRequest(format!(
644 "Cannot redeploy a deployment with status '{}'. Must be 'active' or 'failed'.",
645 status
646 )));
647 }
648
649 let request = body.map(|b| b.0).unwrap_or_default();
651 if request.config_json.is_some() || request.openapi_spec_url.is_some() {
652 let mut query = String::from("UPDATE hosted_mocks SET updated_at = NOW()");
653 let mut param_count = 0;
654
655 if request.config_json.is_some() {
656 param_count += 1;
657 query.push_str(&format!(", config_json = ${}", param_count));
658 }
659 if request.openapi_spec_url.is_some() {
660 param_count += 1;
661 query.push_str(&format!(", openapi_spec_url = ${}", param_count));
662 }
663 param_count += 1;
664 query.push_str(&format!(" WHERE id = ${}", param_count));
665
666 let mut q = sqlx::query(&query);
667 if let Some(ref config) = request.config_json {
668 q = q.bind(config);
669 }
670 if let Some(ref spec_url) = request.openapi_spec_url {
671 q = q.bind(spec_url);
672 }
673 q = q.bind(deployment_id);
674 q.execute(pool).await.map_err(|e| {
675 ApiError::Internal(anyhow::anyhow!("Failed to update deployment: {}", e))
676 })?;
677 }
678
679 state
681 .store
682 .update_hosted_mock_status(deployment_id, DeploymentStatus::Deploying, None)
683 .await?;
684
685 DeploymentLog::create(pool, deployment_id, "info", "Redeployment initiated", None)
686 .await
687 .ok();
688
689 let pool_clone = pool.clone();
691 let deployment_id_clone = deployment_id;
692 tokio::spawn(async move {
693 let pool = &pool_clone;
694
695 let updated_deployment = match HostedMock::find_by_id(pool, deployment_id_clone).await {
697 Ok(Some(d)) => d,
698 Ok(None) => {
699 tracing::error!("Deployment {} not found during redeploy", deployment_id_clone);
700 return;
701 }
702 Err(e) => {
703 tracing::error!(
704 "Failed to fetch deployment {} for redeploy: {}",
705 deployment_id_clone,
706 e
707 );
708 return;
709 }
710 };
711
712 if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
714 let flyio_client = FlyioClient::new(flyio_token);
715
716 let machine_id = updated_deployment
717 .metadata_json
718 .get("flyio_machine_id")
719 .and_then(|v| v.as_str());
720
721 if let Some(machine_id) = machine_id {
722 let app_name = format!(
723 "mockforge-{}-{}",
724 updated_deployment
725 .org_id
726 .to_string()
727 .replace('-', "")
728 .chars()
729 .take(8)
730 .collect::<String>(),
731 updated_deployment.slug
732 );
733
734 let mut env = std::collections::HashMap::new();
736 env.insert(
737 "MOCKFORGE_DEPLOYMENT_ID".to_string(),
738 updated_deployment.id.to_string(),
739 );
740 env.insert("MOCKFORGE_ORG_ID".to_string(), updated_deployment.org_id.to_string());
741 if let Ok(config_str) = serde_json::to_string(&updated_deployment.config_json) {
742 env.insert("MOCKFORGE_CONFIG".to_string(), config_str);
743 }
744 env.insert("PORT".to_string(), "3000".to_string());
745
746 if let Some(ref spec_url) = updated_deployment.openapi_spec_url {
747 env.insert("MOCKFORGE_OPENAPI_SPEC_URL".to_string(), spec_url.clone());
748 }
749
750 let image = std::env::var("MOCKFORGE_DOCKER_IMAGE")
751 .unwrap_or_else(|_| "ghcr.io/saasy-solutions/mockforge:latest".to_string());
752
753 use crate::deployment::flyio::{
754 FlyioCheck, FlyioMachineConfig, FlyioPort, FlyioRegistryAuth, FlyioService,
755 };
756
757 let services = vec![FlyioService {
758 protocol: "tcp".to_string(),
759 internal_port: 3000,
760 ports: vec![
761 FlyioPort {
762 port: 80,
763 handlers: vec!["http".to_string()],
764 },
765 FlyioPort {
766 port: 443,
767 handlers: vec!["tls".to_string(), "http".to_string()],
768 },
769 ],
770 }];
771
772 let mut checks = std::collections::HashMap::new();
773 checks.insert(
774 "alive".to_string(),
775 FlyioCheck {
776 check_type: "http".to_string(),
777 port: 3000,
778 grace_period: "10s".to_string(),
779 interval: "15s".to_string(),
780 method: "GET".to_string(),
781 timeout: "2s".to_string(),
782 tls_skip_verify: false,
783 path: Some("/health/live".to_string()),
784 },
785 );
786
787 let machine_config = FlyioMachineConfig {
788 image,
789 env,
790 services,
791 checks: Some(checks),
792 guest: None,
797 };
798
799 let registry_auth = if let (Ok(server), Ok(username), Ok(password)) = (
801 std::env::var("DOCKER_REGISTRY_SERVER"),
802 std::env::var("DOCKER_REGISTRY_USERNAME"),
803 std::env::var("DOCKER_REGISTRY_PASSWORD"),
804 ) {
805 Some(FlyioRegistryAuth {
806 server,
807 username,
808 password,
809 })
810 } else if machine_config.image.starts_with("registry.fly.io/") {
811 Some(FlyioRegistryAuth {
812 server: "registry.fly.io".to_string(),
813 username: "x".to_string(),
814 password: flyio_client.api_token().to_string(),
815 })
816 } else {
817 None
818 };
819
820 match flyio_client
821 .update_machine(&app_name, machine_id, machine_config, registry_auth)
822 .await
823 {
824 Ok(_) => {
825 let _ = DeploymentLog::create(
826 pool,
827 deployment_id_clone,
828 "info",
829 "Machine updated and restarting",
830 None,
831 )
832 .await;
833 }
834 Err(e) => {
835 tracing::error!("Redeployment failed for {}: {:#}", deployment_id_clone, e);
836 let _ = HostedMock::update_status(
837 pool,
838 deployment_id_clone,
839 DeploymentStatus::Failed,
840 Some(&format!("Redeployment failed: {}", e)),
841 )
842 .await;
843 let _ = DeploymentLog::create(
844 pool,
845 deployment_id_clone,
846 "error",
847 &format!("Redeployment failed: {}", e),
848 None,
849 )
850 .await;
851 return;
852 }
853 }
854 } else {
855 tracing::error!(
856 "No Fly.io machine ID found for deployment {}",
857 deployment_id_clone
858 );
859 let _ = HostedMock::update_status(
860 pool,
861 deployment_id_clone,
862 DeploymentStatus::Failed,
863 Some("No Fly.io machine ID found in deployment metadata"),
864 )
865 .await;
866 return;
867 }
868 }
869
870 let _ =
872 HostedMock::update_status(pool, deployment_id_clone, DeploymentStatus::Active, None)
873 .await;
874
875 let _ = DeploymentLog::create(
876 pool,
877 deployment_id_clone,
878 "info",
879 "Redeployment completed successfully",
880 None,
881 )
882 .await;
883
884 tracing::info!("Successfully redeployed mock service: {}", deployment_id_clone);
885 });
886
887 Ok(Json(serde_json::json!({
888 "id": deployment_id,
889 "status": "deploying",
890 "message": "Redeployment initiated"
891 })))
892}
893
894fn deployment_app_name(deployment: &HostedMock) -> String {
895 format!(
896 "mockforge-{}-{}",
897 deployment
898 .org_id
899 .to_string()
900 .replace('-', "")
901 .chars()
902 .take(8)
903 .collect::<String>(),
904 deployment.slug
905 )
906}
907
908pub async fn stop_deployment(
913 State(state): State<AppState>,
914 AuthUser(user_id): AuthUser,
915 headers: HeaderMap,
916 Path(deployment_id): Path<Uuid>,
917) -> ApiResult<Json<DeploymentResponse>> {
918 let pool = state.db.pool();
919
920 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
921 .await
922 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
923
924 let checker = PermissionChecker::new(&state);
925 checker
926 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
927 .await?;
928
929 let deployment = state
930 .store
931 .find_hosted_mock_by_id(deployment_id)
932 .await?
933 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
934
935 if deployment.org_id != org_ctx.org_id {
936 return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
937 }
938
939 let status = deployment.status();
940 if !matches!(status, DeploymentStatus::Active) {
941 return Err(ApiError::InvalidRequest(format!(
942 "Cannot stop a deployment with status '{}'. Must be 'active'.",
943 status
944 )));
945 }
946
947 if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
948 let flyio_client = FlyioClient::new(flyio_token);
949 let app_name = deployment_app_name(&deployment);
950 let machine_id = deployment.metadata_json.get("flyio_machine_id").and_then(|v| v.as_str());
951
952 if let Some(machine_id) = machine_id {
953 flyio_client.stop_machine(&app_name, machine_id).await.map_err(|e| {
954 ApiError::Internal(anyhow::anyhow!("Failed to stop machine: {}", e))
955 })?;
956 } else {
957 warn!(
958 "No Fly.io machine ID found for deployment {}; marking as stopped anyway",
959 deployment_id
960 );
961 }
962 }
963
964 state
965 .store
966 .update_hosted_mock_status(deployment_id, DeploymentStatus::Stopped, None)
967 .await?;
968
969 DeploymentLog::create(pool, deployment_id, "info", "Deployment stopped", None)
970 .await
971 .ok();
972
973 let updated = state.store.find_hosted_mock_by_id(deployment_id).await?.ok_or_else(|| {
974 ApiError::Internal(anyhow::anyhow!("Failed to retrieve updated deployment"))
975 })?;
976
977 Ok(Json(DeploymentResponse::from(updated)))
978}
979
980pub async fn start_deployment(
982 State(state): State<AppState>,
983 AuthUser(user_id): AuthUser,
984 headers: HeaderMap,
985 Path(deployment_id): Path<Uuid>,
986) -> ApiResult<Json<DeploymentResponse>> {
987 let pool = state.db.pool();
988
989 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
990 .await
991 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
992
993 let checker = PermissionChecker::new(&state);
994 checker
995 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
996 .await?;
997
998 let deployment = state
999 .store
1000 .find_hosted_mock_by_id(deployment_id)
1001 .await?
1002 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1003
1004 if deployment.org_id != org_ctx.org_id {
1005 return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
1006 }
1007
1008 let status = deployment.status();
1009 if !matches!(status, DeploymentStatus::Stopped) {
1010 return Err(ApiError::InvalidRequest(format!(
1011 "Cannot start a deployment with status '{}'. Must be 'stopped'.",
1012 status
1013 )));
1014 }
1015
1016 if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
1017 let flyio_client = FlyioClient::new(flyio_token);
1018 let app_name = deployment_app_name(&deployment);
1019 let machine_id = deployment.metadata_json.get("flyio_machine_id").and_then(|v| v.as_str());
1020
1021 if let Some(machine_id) = machine_id {
1022 flyio_client.start_machine(&app_name, machine_id).await.map_err(|e| {
1023 ApiError::Internal(anyhow::anyhow!("Failed to start machine: {}", e))
1024 })?;
1025 } else {
1026 return Err(ApiError::InvalidRequest(
1027 "No Fly.io machine ID found in deployment metadata; cannot start".to_string(),
1028 ));
1029 }
1030 }
1031
1032 state
1033 .store
1034 .update_hosted_mock_status(deployment_id, DeploymentStatus::Active, None)
1035 .await?;
1036
1037 DeploymentLog::create(pool, deployment_id, "info", "Deployment started", None)
1038 .await
1039 .ok();
1040
1041 let updated = state.store.find_hosted_mock_by_id(deployment_id).await?.ok_or_else(|| {
1042 ApiError::Internal(anyhow::anyhow!("Failed to retrieve updated deployment"))
1043 })?;
1044
1045 Ok(Json(DeploymentResponse::from(updated)))
1046}
1047
1048pub async fn get_deployment_logs(
1050 State(state): State<AppState>,
1051 AuthUser(user_id): AuthUser,
1052 headers: HeaderMap,
1053 Path(deployment_id): Path<Uuid>,
1054) -> ApiResult<Json<Vec<LogResponse>>> {
1055 let pool = state.db.pool();
1056
1057 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1059 .await
1060 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1061
1062 let deployment = HostedMock::find_by_id(pool, deployment_id)
1064 .await
1065 .map_err(ApiError::Database)?
1066 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1067
1068 if deployment.org_id != org_ctx.org_id {
1070 return Err(ApiError::InvalidRequest(
1071 "You don't have access to this deployment".to_string(),
1072 ));
1073 }
1074
1075 let logs = DeploymentLog::find_by_mock(pool, deployment_id, Some(100))
1077 .await
1078 .map_err(ApiError::Database)?;
1079
1080 let responses: Vec<LogResponse> = logs.into_iter().map(LogResponse::from).collect();
1081
1082 Ok(Json(responses))
1083}
1084
1085#[derive(Debug, Deserialize)]
1087pub struct RuntimeLogsQuery {
1088 pub limit: Option<u32>,
1090 pub since: Option<String>,
1092}
1093
1094pub async fn get_runtime_logs(
1102 State(state): State<AppState>,
1103 AuthUser(user_id): AuthUser,
1104 headers: HeaderMap,
1105 Path(deployment_id): Path<Uuid>,
1106 Query(params): Query<RuntimeLogsQuery>,
1107) -> ApiResult<Json<Vec<LogEntry>>> {
1108 let pool = state.db.pool();
1109
1110 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1111 .await
1112 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1113
1114 let deployment = HostedMock::find_by_id(pool, deployment_id)
1115 .await
1116 .map_err(ApiError::Database)?
1117 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1118
1119 if deployment.org_id != org_ctx.org_id {
1120 return Err(ApiError::InvalidRequest(
1121 "You don't have access to this deployment".to_string(),
1122 ));
1123 }
1124
1125 let Some(client) = crate::fly_logs::global() else {
1126 return Ok(Json(Vec::new()));
1131 };
1132
1133 let since = params
1134 .since
1135 .as_deref()
1136 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
1137 .map(|d| d.with_timezone(&Utc));
1138
1139 let app_name = deployment.fly_app_name();
1140 let entries = client.fetch_recent(&app_name, since, params.limit).await.map_err(|e| {
1141 warn!(error = %e, app_name = %app_name, "Fly runtime logs fetch failed");
1142 ApiError::Internal(anyhow::anyhow!("Fly logs query failed: {}", e))
1143 })?;
1144
1145 Ok(Json(entries))
1146}
1147
1148pub async fn stream_runtime_logs(
1152 State(state): State<AppState>,
1153 AuthUser(user_id): AuthUser,
1154 headers: HeaderMap,
1155 Path(deployment_id): Path<Uuid>,
1156) -> ApiResult<Sse<impl Stream<Item = Result<Event, Infallible>>>> {
1157 let pool = state.db.pool();
1158 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1159 .await
1160 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1161
1162 let deployment = HostedMock::find_by_id(pool, deployment_id)
1163 .await
1164 .map_err(ApiError::Database)?
1165 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1166
1167 if deployment.org_id != org_ctx.org_id {
1168 return Err(ApiError::InvalidRequest(
1169 "You don't have access to this deployment".to_string(),
1170 ));
1171 }
1172
1173 let app_name = deployment.fly_app_name();
1174 let stream = build_runtime_logs_stream(app_name);
1175
1176 Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
1177}
1178
1179fn build_runtime_logs_stream(app_name: String) -> impl Stream<Item = Result<Event, Infallible>> {
1183 use futures_util::stream::unfold;
1184
1185 struct State {
1189 cursor: DateTime<Utc>,
1190 client: Option<&'static crate::fly_logs::FlyLogsClient>,
1191 app_name: String,
1192 emitted_unconfigured: bool,
1193 }
1194
1195 let state = State {
1196 cursor: Utc::now() - chrono::Duration::seconds(30),
1197 client: crate::fly_logs::global(),
1198 app_name,
1199 emitted_unconfigured: false,
1200 };
1201
1202 unfold(state, |mut st| async move {
1203 if st.client.is_none() && !st.emitted_unconfigured {
1207 st.emitted_unconfigured = true;
1208 let event = Event::default()
1209 .event("config")
1210 .data("Fly runtime logs are not configured (FLYIO_API_TOKEN unset)");
1211 return Some((Ok(event), st));
1212 }
1213
1214 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1215
1216 let Some(client) = st.client else {
1217 let event = Event::default().comment("idle");
1220 return Some((Ok(event), st));
1221 };
1222
1223 match client.fetch_recent(&st.app_name, Some(st.cursor), None).await {
1224 Ok(entries) if entries.is_empty() => {
1225 let event = Event::default().comment("no-new-logs");
1226 Some((Ok(event), st))
1227 }
1228 Ok(entries) => {
1229 if let Some(latest) = entries.iter().map(|e| e.timestamp).max() {
1231 st.cursor = latest;
1232 }
1233 let payload = serde_json::to_string(&entries).unwrap_or_else(|_| "[]".to_string());
1234 let event = Event::default().event("logs").data(payload);
1235 Some((Ok(event), st))
1236 }
1237 Err(err) => {
1238 let payload = serde_json::json!({ "error": err.to_string() }).to_string();
1239 let event = Event::default().event("error").data(payload);
1240 Some((Ok(event), st))
1241 }
1242 }
1243 })
1244}
1245
1246#[derive(Debug, Deserialize, Serialize)]
1250pub struct RuntimeRequestEvent {
1251 pub timestamp: DateTime<Utc>,
1252 pub method: String,
1253 pub path: String,
1254 pub status: u16,
1255 pub latency_ms: u32,
1256 #[serde(default)]
1257 pub matched_route: Option<String>,
1258 #[serde(default)]
1259 pub client_ip: Option<String>,
1260 #[serde(default)]
1261 pub user_agent: Option<String>,
1262 #[serde(default)]
1263 pub request_id: Option<String>,
1264 #[serde(default)]
1265 pub bytes_in: Option<i64>,
1266 #[serde(default)]
1267 pub bytes_out: Option<i64>,
1268}
1269
1270#[derive(Debug, Deserialize)]
1271pub struct IngestPayload {
1272 pub events: Vec<RuntimeRequestEvent>,
1273}
1274
1275#[derive(Debug, Serialize)]
1276pub struct IngestResponse {
1277 pub accepted: usize,
1278}
1279
1280pub async fn ingest_runtime_logs(
1287 State(state): State<AppState>,
1288 Path(deployment_id): Path<Uuid>,
1289 headers: HeaderMap,
1290 Json(payload): Json<IngestPayload>,
1291) -> ApiResult<Json<IngestResponse>> {
1292 let auth = headers
1294 .get("Authorization")
1295 .and_then(|h| h.to_str().ok())
1296 .and_then(|h| h.strip_prefix("Bearer "))
1297 .ok_or_else(|| ApiError::InvalidRequest("Missing deployment ingest token".to_string()))?;
1298
1299 let token_deployment_id = mockforge_registry_core::auth::verify_deployment_ingest_token(
1300 auth,
1301 &state.config.jwt_secret,
1302 )
1303 .map_err(|e| {
1304 tracing::warn!(error = %e, "Deployment ingest token rejected");
1305 ApiError::InvalidRequest("Invalid deployment ingest token".to_string())
1306 })?;
1307
1308 if token_deployment_id != deployment_id {
1309 return Err(ApiError::InvalidRequest(
1310 "Token deployment id does not match URL path".to_string(),
1311 ));
1312 }
1313
1314 if payload.events.is_empty() {
1315 return Ok(Json(IngestResponse { accepted: 0 }));
1316 }
1317
1318 const MAX_BATCH: usize = 500;
1322 let events: Vec<RuntimeRequestEvent> = payload.events.into_iter().take(MAX_BATCH).collect();
1323 let accepted = events.len();
1324
1325 let pool = state.db.pool();
1329 let mut tx = pool.begin().await.map_err(ApiError::Database)?;
1330 for evt in &events {
1331 sqlx::query(
1332 r#"
1333 INSERT INTO runtime_request_logs (
1334 deployment_id, occurred_at, method, path, status, latency_ms,
1335 matched_route, client_ip, user_agent, request_id, bytes_in, bytes_out
1336 )
1337 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
1338 "#,
1339 )
1340 .bind(deployment_id)
1341 .bind(evt.timestamp)
1342 .bind(&evt.method)
1343 .bind(&evt.path)
1344 .bind(evt.status as i16)
1345 .bind(evt.latency_ms as i32)
1346 .bind(evt.matched_route.as_ref())
1347 .bind(evt.client_ip.as_ref())
1348 .bind(evt.user_agent.as_ref())
1349 .bind(evt.request_id.as_ref())
1350 .bind(evt.bytes_in)
1351 .bind(evt.bytes_out)
1352 .execute(&mut *tx)
1353 .await
1354 .map_err(ApiError::Database)?;
1355 }
1356 tx.commit().await.map_err(ApiError::Database)?;
1357
1358 Ok(Json(IngestResponse { accepted }))
1359}
1360
1361#[derive(Debug, Deserialize)]
1371pub struct CaptureIngestRequest {
1372 pub id: String,
1373 pub protocol: String,
1374 pub timestamp: DateTime<Utc>,
1375 pub method: String,
1376 pub path: String,
1377 #[serde(default)]
1378 pub query_params: Option<String>,
1379 pub headers: String,
1380 #[serde(default)]
1381 pub body: Option<String>,
1382 pub body_encoding: String,
1383 #[serde(default)]
1384 pub client_ip: Option<String>,
1385 #[serde(default)]
1386 pub trace_id: Option<String>,
1387 #[serde(default)]
1388 pub span_id: Option<String>,
1389 #[serde(default)]
1390 pub duration_ms: Option<i64>,
1391 #[serde(default)]
1392 pub status_code: Option<i32>,
1393 #[serde(default)]
1394 pub tags: Option<String>,
1395}
1396
1397#[derive(Debug, Deserialize)]
1398pub struct CaptureIngestResponse {
1399 pub status_code: i32,
1400 pub headers: String,
1401 #[serde(default)]
1402 pub body: Option<String>,
1403 pub body_encoding: String,
1404 pub size_bytes: i64,
1405 pub timestamp: DateTime<Utc>,
1406}
1407
1408#[derive(Debug, Deserialize)]
1409pub struct CaptureIngestExchange {
1410 pub request: CaptureIngestRequest,
1411 #[serde(default)]
1412 pub response: Option<CaptureIngestResponse>,
1413}
1414
1415#[derive(Debug, Deserialize)]
1416pub struct CaptureIngestPayload {
1417 pub exchanges: Vec<CaptureIngestExchange>,
1418}
1419
1420pub async fn ingest_runtime_captures(
1426 State(state): State<AppState>,
1427 Path(deployment_id): Path<Uuid>,
1428 headers: HeaderMap,
1429 Json(payload): Json<CaptureIngestPayload>,
1430) -> ApiResult<Json<IngestResponse>> {
1431 let auth = headers
1432 .get("Authorization")
1433 .and_then(|h| h.to_str().ok())
1434 .and_then(|h| h.strip_prefix("Bearer "))
1435 .ok_or_else(|| ApiError::InvalidRequest("Missing deployment ingest token".to_string()))?;
1436
1437 let token_deployment_id = mockforge_registry_core::auth::verify_deployment_ingest_token(
1438 auth,
1439 &state.config.jwt_secret,
1440 )
1441 .map_err(|e| {
1442 tracing::warn!(error = %e, "Capture ingest token rejected");
1443 ApiError::InvalidRequest("Invalid deployment ingest token".to_string())
1444 })?;
1445
1446 if token_deployment_id != deployment_id {
1447 return Err(ApiError::InvalidRequest(
1448 "Token deployment id does not match URL path".to_string(),
1449 ));
1450 }
1451
1452 if payload.exchanges.is_empty() {
1453 return Ok(Json(IngestResponse { accepted: 0 }));
1454 }
1455
1456 const MAX_BATCH: usize = 100;
1460 let exchanges: Vec<CaptureIngestExchange> =
1461 payload.exchanges.into_iter().take(MAX_BATCH).collect();
1462 let accepted = exchanges.len();
1463
1464 let pool = state.db.pool();
1465 let mut tx = pool.begin().await.map_err(ApiError::Database)?;
1466 for exchange in &exchanges {
1467 let req = &exchange.request;
1468 let resp = exchange.response.as_ref();
1469 sqlx::query(
1470 r#"
1471 INSERT INTO runtime_captures (
1472 deployment_id, capture_id, protocol, occurred_at, method, path,
1473 query_params, request_headers, request_body, request_body_encoding,
1474 client_ip, trace_id, span_id, duration_ms, status_code, tags,
1475 response_status_code, response_headers, response_body,
1476 response_body_encoding, response_size_bytes, response_timestamp
1477 )
1478 VALUES (
1479 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14,
1480 $15, $16, $17, $18, $19, $20, $21, $22
1481 )
1482 ON CONFLICT (deployment_id, capture_id) DO NOTHING
1483 "#,
1484 )
1485 .bind(deployment_id)
1486 .bind(&req.id)
1487 .bind(&req.protocol)
1488 .bind(req.timestamp)
1489 .bind(&req.method)
1490 .bind(&req.path)
1491 .bind(req.query_params.as_ref())
1492 .bind(&req.headers)
1493 .bind(req.body.as_ref())
1494 .bind(&req.body_encoding)
1495 .bind(req.client_ip.as_ref())
1496 .bind(req.trace_id.as_ref())
1497 .bind(req.span_id.as_ref())
1498 .bind(req.duration_ms)
1499 .bind(req.status_code)
1500 .bind(req.tags.as_ref())
1501 .bind(resp.map(|r| r.status_code))
1502 .bind(resp.map(|r| r.headers.as_str()))
1503 .bind(resp.and_then(|r| r.body.as_deref()))
1504 .bind(resp.map(|r| r.body_encoding.as_str()))
1505 .bind(resp.map(|r| r.size_bytes))
1506 .bind(resp.map(|r| r.timestamp))
1507 .execute(&mut *tx)
1508 .await
1509 .map_err(ApiError::Database)?;
1510 }
1511 tx.commit().await.map_err(ApiError::Database)?;
1512
1513 Ok(Json(IngestResponse { accepted }))
1514}
1515
1516#[derive(Debug, Deserialize)]
1517pub struct RuntimeRequestsQuery {
1518 pub limit: Option<u32>,
1520 pub since: Option<String>,
1522}
1523
1524pub async fn get_runtime_requests(
1527 State(state): State<AppState>,
1528 AuthUser(user_id): AuthUser,
1529 headers: HeaderMap,
1530 Path(deployment_id): Path<Uuid>,
1531 Query(params): Query<RuntimeRequestsQuery>,
1532) -> ApiResult<Json<Vec<RuntimeRequestEvent>>> {
1533 let pool = state.db.pool();
1534
1535 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1536 .await
1537 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1538
1539 let deployment = HostedMock::find_by_id(pool, deployment_id)
1540 .await
1541 .map_err(ApiError::Database)?
1542 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1543
1544 if deployment.org_id != org_ctx.org_id {
1545 return Err(ApiError::InvalidRequest(
1546 "You don't have access to this deployment".to_string(),
1547 ));
1548 }
1549
1550 let limit = params.limit.unwrap_or(100).min(500) as i64;
1551 let since = params
1552 .since
1553 .as_deref()
1554 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
1555 .map(|d| d.with_timezone(&Utc));
1556
1557 type RuntimeRequestRow = (
1558 DateTime<Utc>,
1559 String,
1560 String,
1561 i16,
1562 i32,
1563 Option<String>,
1564 Option<String>,
1565 Option<String>,
1566 Option<String>,
1567 Option<i64>,
1568 Option<i64>,
1569 );
1570
1571 let rows: Vec<RuntimeRequestRow> = if let Some(since) = since {
1572 sqlx::query_as(
1573 r#"
1574 SELECT occurred_at, method, path, status, latency_ms,
1575 matched_route, client_ip, user_agent, request_id,
1576 bytes_in, bytes_out
1577 FROM runtime_request_logs
1578 WHERE deployment_id = $1 AND occurred_at > $2
1579 ORDER BY occurred_at DESC
1580 LIMIT $3
1581 "#,
1582 )
1583 .bind(deployment_id)
1584 .bind(since)
1585 .bind(limit)
1586 .fetch_all(pool)
1587 .await
1588 .map_err(ApiError::Database)?
1589 } else {
1590 sqlx::query_as(
1591 r#"
1592 SELECT occurred_at, method, path, status, latency_ms,
1593 matched_route, client_ip, user_agent, request_id,
1594 bytes_in, bytes_out
1595 FROM runtime_request_logs
1596 WHERE deployment_id = $1
1597 ORDER BY occurred_at DESC
1598 LIMIT $2
1599 "#,
1600 )
1601 .bind(deployment_id)
1602 .bind(limit)
1603 .fetch_all(pool)
1604 .await
1605 .map_err(ApiError::Database)?
1606 };
1607
1608 let events: Vec<RuntimeRequestEvent> = rows
1609 .into_iter()
1610 .map(|row| RuntimeRequestEvent {
1611 timestamp: row.0,
1612 method: row.1,
1613 path: row.2,
1614 status: row.3 as u16,
1615 latency_ms: row.4 as u32,
1616 matched_route: row.5,
1617 client_ip: row.6,
1618 user_agent: row.7,
1619 request_id: row.8,
1620 bytes_in: row.9,
1621 bytes_out: row.10,
1622 })
1623 .collect();
1624
1625 Ok(Json(events))
1626}
1627
1628async fn proxy_to_deployment_recorder(
1641 deployment: &HostedMock,
1642 path_and_query: &str,
1643) -> ApiResult<axum::http::Response<axum::body::Body>> {
1644 let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
1645 let Some(base) = base else {
1646 return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
1647 };
1648 let url = format!("{}{}", base.trim_end_matches('/'), path_and_query);
1649
1650 let client = reqwest::Client::builder()
1651 .timeout(std::time::Duration::from_secs(10))
1652 .build()
1653 .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
1654
1655 let resp =
1656 client.get(&url).send().await.map_err(|e| {
1657 ApiError::Internal(anyhow::anyhow!("Recorder proxy fetch failed: {}", e))
1658 })?;
1659
1660 let status = resp.status();
1661 let headers = resp.headers().clone();
1662 let body = resp.bytes().await.map_err(|e| {
1663 ApiError::Internal(anyhow::anyhow!("Recorder proxy read body failed: {}", e))
1664 })?;
1665
1666 let mut builder = axum::http::Response::builder().status(status);
1667 if let Some(content_type) = headers.get(axum::http::header::CONTENT_TYPE) {
1668 builder = builder.header(axum::http::header::CONTENT_TYPE, content_type);
1669 }
1670 builder.body(axum::body::Body::from(body)).map_err(|e| {
1671 ApiError::Internal(anyhow::anyhow!("Recorder proxy response build failed: {}", e))
1672 })
1673}
1674
1675async fn proxy_to_deployment_state_machines(
1680 deployment: &HostedMock,
1681 path_and_query: &str,
1682) -> ApiResult<axum::http::Response<axum::body::Body>> {
1683 let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
1684 let Some(base) = base else {
1685 return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
1686 };
1687 let url = format!("{}{}", base.trim_end_matches('/'), path_and_query);
1688
1689 let client = reqwest::Client::builder()
1690 .timeout(std::time::Duration::from_secs(10))
1691 .build()
1692 .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
1693
1694 let resp = client.get(&url).send().await.map_err(|e| {
1695 ApiError::Internal(anyhow::anyhow!("State-machine proxy fetch failed: {}", e))
1696 })?;
1697 let status = resp.status();
1698 let headers = resp.headers().clone();
1699 let body = resp.bytes().await.map_err(|e| {
1700 ApiError::Internal(anyhow::anyhow!("State-machine proxy read body failed: {}", e))
1701 })?;
1702
1703 let mut builder = axum::http::Response::builder().status(status);
1704 if let Some(ct) = headers.get(axum::http::header::CONTENT_TYPE) {
1705 builder = builder.header(axum::http::header::CONTENT_TYPE, ct);
1706 }
1707 builder.body(axum::body::Body::from(body)).map_err(|e| {
1708 ApiError::Internal(anyhow::anyhow!("State-machine proxy response build failed: {}", e))
1709 })
1710}
1711
1712pub async fn list_deployment_state_machines(
1715 State(state): State<AppState>,
1716 AuthUser(user_id): AuthUser,
1717 headers: HeaderMap,
1718 Path(deployment_id): Path<Uuid>,
1719) -> ApiResult<axum::http::Response<axum::body::Body>> {
1720 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
1721 proxy_to_deployment_state_machines(&deployment, "/__mockforge/api/state-machines").await
1722}
1723
1724pub async fn get_deployment_state_machine(
1727 State(state): State<AppState>,
1728 AuthUser(user_id): AuthUser,
1729 headers: HeaderMap,
1730 Path((deployment_id, resource_type)): Path<(Uuid, String)>,
1731) -> ApiResult<axum::http::Response<axum::body::Body>> {
1732 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
1733 if resource_type.contains('/') || resource_type.contains('?') || resource_type.contains('#') {
1734 return Err(ApiError::InvalidRequest("Invalid resource type".to_string()));
1735 }
1736 let path = format!("/__mockforge/api/state-machines/{}", urlencoding::encode(&resource_type));
1737 proxy_to_deployment_state_machines(&deployment, &path).await
1738}
1739
1740pub async fn list_deployment_state_machine_instances(
1743 State(state): State<AppState>,
1744 AuthUser(user_id): AuthUser,
1745 headers: HeaderMap,
1746 Path(deployment_id): Path<Uuid>,
1747) -> ApiResult<axum::http::Response<axum::body::Body>> {
1748 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
1749 proxy_to_deployment_state_machines(&deployment, "/__mockforge/api/state-machines/instances")
1750 .await
1751}
1752
1753#[derive(Debug, Deserialize)]
1754pub struct RecorderCapturesQuery {
1755 pub limit: Option<u32>,
1756 pub since: Option<String>,
1757}
1758
1759pub async fn list_recorder_captures(
1768 State(state): State<AppState>,
1769 AuthUser(user_id): AuthUser,
1770 headers: HeaderMap,
1771 Path(deployment_id): Path<Uuid>,
1772 Query(params): Query<RecorderCapturesQuery>,
1773) -> ApiResult<axum::http::Response<axum::body::Body>> {
1774 let pool = state.db.pool();
1775
1776 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1777 .await
1778 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1779
1780 let deployment = HostedMock::find_by_id(pool, deployment_id)
1781 .await
1782 .map_err(ApiError::Database)?
1783 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1784
1785 if deployment.org_id != org_ctx.org_id {
1786 return Err(ApiError::InvalidRequest(
1787 "You don't have access to this deployment".to_string(),
1788 ));
1789 }
1790
1791 let has_synced: bool = sqlx::query_scalar(
1795 "SELECT EXISTS(SELECT 1 FROM runtime_captures WHERE deployment_id = $1 LIMIT 1)",
1796 )
1797 .bind(deployment_id)
1798 .fetch_one(pool)
1799 .await
1800 .map_err(ApiError::Database)?;
1801
1802 if has_synced {
1803 let limit = params.limit.unwrap_or(100).min(500) as i64;
1804 let since = params
1805 .since
1806 .as_deref()
1807 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
1808 .map(|d| d.with_timezone(&Utc));
1809 let captures = list_cloud_captures(pool, deployment_id, limit, since).await?;
1810 let body = serde_json::to_vec(&captures).map_err(|e| {
1811 ApiError::InvalidRequest(format!("Failed to serialize captures: {}", e))
1812 })?;
1813 return Ok(axum::http::Response::builder()
1814 .status(axum::http::StatusCode::OK)
1815 .header("content-type", "application/json")
1816 .body(axum::body::Body::from(body))
1817 .unwrap());
1818 }
1819
1820 let mut qs = String::from("/api/recorder/requests");
1821 let mut sep = '?';
1822 if let Some(limit) = params.limit {
1823 qs.push(sep);
1824 qs.push_str(&format!("limit={}", limit));
1825 sep = '&';
1826 }
1827 if let Some(since) = params.since.as_deref() {
1828 qs.push(sep);
1829 qs.push_str(&format!("since={}", urlencoding::encode(since)));
1830 }
1831
1832 let _ = state;
1833 proxy_to_deployment_recorder(&deployment, &qs).await
1834}
1835
1836async fn list_cloud_captures(
1839 pool: &sqlx::PgPool,
1840 deployment_id: Uuid,
1841 limit: i64,
1842 since: Option<DateTime<Utc>>,
1843) -> ApiResult<Vec<serde_json::Value>> {
1844 type Row = (
1845 String, String, DateTime<Utc>, String, String, Option<String>, String, Option<String>, String, Option<String>, Option<String>, Option<String>, Option<i64>, Option<i32>, Option<String>, );
1861
1862 let rows: Vec<Row> = if let Some(since) = since {
1863 sqlx::query_as(
1864 r#"
1865 SELECT capture_id, protocol, occurred_at, method, path, query_params,
1866 request_headers, request_body, request_body_encoding,
1867 client_ip, trace_id, span_id, duration_ms, status_code, tags
1868 FROM runtime_captures
1869 WHERE deployment_id = $1 AND occurred_at > $2
1870 ORDER BY occurred_at DESC
1871 LIMIT $3
1872 "#,
1873 )
1874 .bind(deployment_id)
1875 .bind(since)
1876 .bind(limit)
1877 .fetch_all(pool)
1878 .await
1879 .map_err(ApiError::Database)?
1880 } else {
1881 sqlx::query_as(
1882 r#"
1883 SELECT capture_id, protocol, occurred_at, method, path, query_params,
1884 request_headers, request_body, request_body_encoding,
1885 client_ip, trace_id, span_id, duration_ms, status_code, tags
1886 FROM runtime_captures
1887 WHERE deployment_id = $1
1888 ORDER BY occurred_at DESC
1889 LIMIT $2
1890 "#,
1891 )
1892 .bind(deployment_id)
1893 .bind(limit)
1894 .fetch_all(pool)
1895 .await
1896 .map_err(ApiError::Database)?
1897 };
1898
1899 let captures = rows
1900 .into_iter()
1901 .map(|r| {
1902 serde_json::json!({
1903 "id": r.0,
1904 "protocol": r.1,
1905 "timestamp": r.2,
1906 "method": r.3,
1907 "path": r.4,
1908 "query_params": r.5,
1909 "headers": r.6,
1910 "body": r.7,
1911 "body_encoding": r.8,
1912 "client_ip": r.9,
1913 "trace_id": r.10,
1914 "span_id": r.11,
1915 "duration_ms": r.12,
1916 "status_code": r.13,
1917 "tags": r.14,
1918 })
1919 })
1920 .collect();
1921 Ok(captures)
1922}
1923
1924pub async fn get_recorder_capture(
1927 State(state): State<AppState>,
1928 AuthUser(user_id): AuthUser,
1929 headers: HeaderMap,
1930 Path((deployment_id, capture_id)): Path<(Uuid, String)>,
1931) -> ApiResult<axum::http::Response<axum::body::Body>> {
1932 let pool = state.db.pool();
1933
1934 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1935 .await
1936 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1937
1938 let deployment = HostedMock::find_by_id(pool, deployment_id)
1939 .await
1940 .map_err(ApiError::Database)?
1941 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1942
1943 if deployment.org_id != org_ctx.org_id {
1944 return Err(ApiError::InvalidRequest(
1945 "You don't have access to this deployment".to_string(),
1946 ));
1947 }
1948
1949 if capture_id.contains('/') || capture_id.contains('?') || capture_id.contains('#') {
1952 return Err(ApiError::InvalidRequest("Invalid capture id".to_string()));
1953 }
1954
1955 if let Some(row) = fetch_cloud_capture(pool, deployment_id, &capture_id).await? {
1956 let body = serde_json::to_vec(&row)
1957 .map_err(|e| ApiError::InvalidRequest(format!("Failed to serialize capture: {}", e)))?;
1958 return Ok(axum::http::Response::builder()
1959 .status(axum::http::StatusCode::OK)
1960 .header("content-type", "application/json")
1961 .body(axum::body::Body::from(body))
1962 .unwrap());
1963 }
1964
1965 let path = format!("/api/recorder/requests/{}", urlencoding::encode(&capture_id));
1966 let _ = state;
1967 proxy_to_deployment_recorder(&deployment, &path).await
1968}
1969
1970async fn fetch_cloud_capture(
1973 pool: &sqlx::PgPool,
1974 deployment_id: Uuid,
1975 capture_id: &str,
1976) -> ApiResult<Option<serde_json::Value>> {
1977 type Row = (
1978 String,
1979 String,
1980 DateTime<Utc>,
1981 String,
1982 String,
1983 Option<String>,
1984 String,
1985 Option<String>,
1986 String,
1987 Option<String>,
1988 Option<String>,
1989 Option<String>,
1990 Option<i64>,
1991 Option<i32>,
1992 Option<String>,
1993 );
1994 let row: Option<Row> = sqlx::query_as(
1995 r#"
1996 SELECT capture_id, protocol, occurred_at, method, path, query_params,
1997 request_headers, request_body, request_body_encoding,
1998 client_ip, trace_id, span_id, duration_ms, status_code, tags
1999 FROM runtime_captures
2000 WHERE deployment_id = $1 AND capture_id = $2
2001 LIMIT 1
2002 "#,
2003 )
2004 .bind(deployment_id)
2005 .bind(capture_id)
2006 .fetch_optional(pool)
2007 .await
2008 .map_err(ApiError::Database)?;
2009 Ok(row.map(|r| {
2010 serde_json::json!({
2011 "id": r.0,
2012 "protocol": r.1,
2013 "timestamp": r.2,
2014 "method": r.3,
2015 "path": r.4,
2016 "query_params": r.5,
2017 "headers": r.6,
2018 "body": r.7,
2019 "body_encoding": r.8,
2020 "client_ip": r.9,
2021 "trace_id": r.10,
2022 "span_id": r.11,
2023 "duration_ms": r.12,
2024 "status_code": r.13,
2025 "tags": r.14,
2026 })
2027 }))
2028}
2029
2030pub async fn get_recorder_capture_response(
2034 State(state): State<AppState>,
2035 AuthUser(user_id): AuthUser,
2036 headers: HeaderMap,
2037 Path((deployment_id, capture_id)): Path<(Uuid, String)>,
2038) -> ApiResult<axum::http::Response<axum::body::Body>> {
2039 let pool = state.db.pool();
2040
2041 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2042 .await
2043 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2044
2045 let deployment = HostedMock::find_by_id(pool, deployment_id)
2046 .await
2047 .map_err(ApiError::Database)?
2048 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2049
2050 if deployment.org_id != org_ctx.org_id {
2051 return Err(ApiError::InvalidRequest(
2052 "You don't have access to this deployment".to_string(),
2053 ));
2054 }
2055
2056 if capture_id.contains('/') || capture_id.contains('?') || capture_id.contains('#') {
2057 return Err(ApiError::InvalidRequest("Invalid capture id".to_string()));
2058 }
2059
2060 if let Some(row) = fetch_cloud_capture_response(pool, deployment_id, &capture_id).await? {
2061 let body = serde_json::to_vec(&row).map_err(|e| {
2062 ApiError::InvalidRequest(format!("Failed to serialize response: {}", e))
2063 })?;
2064 return Ok(axum::http::Response::builder()
2065 .status(axum::http::StatusCode::OK)
2066 .header("content-type", "application/json")
2067 .body(axum::body::Body::from(body))
2068 .unwrap());
2069 }
2070
2071 let path = format!("/api/recorder/requests/{}/response", urlencoding::encode(&capture_id));
2072 let _ = state;
2073 proxy_to_deployment_recorder(&deployment, &path).await
2074}
2075
2076async fn fetch_cloud_capture_response(
2080 pool: &sqlx::PgPool,
2081 deployment_id: Uuid,
2082 capture_id: &str,
2083) -> ApiResult<Option<serde_json::Value>> {
2084 type Row = (Option<i32>, Option<String>, Option<String>, Option<String>, Option<i64>);
2085 let row: Option<Row> = sqlx::query_as(
2086 r#"
2087 SELECT response_status_code, response_headers, response_body,
2088 response_body_encoding, response_size_bytes
2089 FROM runtime_captures
2090 WHERE deployment_id = $1 AND capture_id = $2
2091 LIMIT 1
2092 "#,
2093 )
2094 .bind(deployment_id)
2095 .bind(capture_id)
2096 .fetch_optional(pool)
2097 .await
2098 .map_err(ApiError::Database)?;
2099 Ok(row.and_then(|r| {
2100 let status_code = r.0?;
2101 Some(serde_json::json!({
2102 "status_code": status_code,
2103 "headers": r.1.unwrap_or_else(|| "{}".to_string()),
2104 "body": r.2,
2105 "body_encoding": r.3.unwrap_or_else(|| "utf8".to_string()),
2106 "size_bytes": r.4.unwrap_or(0),
2107 }))
2108 }))
2109}
2110
2111async fn proxy_post_to_deployment_recorder(
2116 deployment: &HostedMock,
2117 path: &str,
2118) -> ApiResult<axum::http::Response<axum::body::Body>> {
2119 let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
2120 let Some(base) = base else {
2121 return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
2122 };
2123 let url = format!("{}{}", base.trim_end_matches('/'), path);
2124
2125 let client = reqwest::Client::builder()
2126 .timeout(std::time::Duration::from_secs(10))
2127 .build()
2128 .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
2129
2130 let resp =
2131 client.post(&url).send().await.map_err(|e| {
2132 ApiError::Internal(anyhow::anyhow!("Recorder proxy POST failed: {}", e))
2133 })?;
2134
2135 let status = resp.status();
2136 let headers = resp.headers().clone();
2137 let body = resp.bytes().await.map_err(|e| {
2138 ApiError::Internal(anyhow::anyhow!("Recorder proxy read body failed: {}", e))
2139 })?;
2140
2141 let mut builder = axum::http::Response::builder().status(status);
2142 if let Some(content_type) = headers.get(axum::http::header::CONTENT_TYPE) {
2143 builder = builder.header(axum::http::header::CONTENT_TYPE, content_type);
2144 }
2145 builder.body(axum::body::Body::from(body)).map_err(|e| {
2146 ApiError::Internal(anyhow::anyhow!("Recorder proxy response build failed: {}", e))
2147 })
2148}
2149
2150async fn check_org_access(
2151 state: &AppState,
2152 user_id: Uuid,
2153 headers: &HeaderMap,
2154 deployment_id: Uuid,
2155) -> ApiResult<HostedMock> {
2156 let pool = state.db.pool();
2157
2158 let org_ctx = resolve_org_context(state, user_id, headers, None)
2159 .await
2160 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2161
2162 let deployment = HostedMock::find_by_id(pool, deployment_id)
2163 .await
2164 .map_err(ApiError::Database)?
2165 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2166
2167 if deployment.org_id != org_ctx.org_id {
2168 return Err(ApiError::InvalidRequest(
2169 "You don't have access to this deployment".to_string(),
2170 ));
2171 }
2172 Ok(deployment)
2173}
2174
2175pub async fn get_recorder_status(
2179 State(state): State<AppState>,
2180 AuthUser(user_id): AuthUser,
2181 headers: HeaderMap,
2182 Path(deployment_id): Path<Uuid>,
2183) -> ApiResult<axum::http::Response<axum::body::Body>> {
2184 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2185 proxy_to_deployment_recorder(&deployment, "/api/recorder/status").await
2186}
2187
2188pub async fn enable_recorder(
2190 State(state): State<AppState>,
2191 AuthUser(user_id): AuthUser,
2192 headers: HeaderMap,
2193 Path(deployment_id): Path<Uuid>,
2194) -> ApiResult<axum::http::Response<axum::body::Body>> {
2195 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2196 proxy_post_to_deployment_recorder(&deployment, "/api/recorder/enable").await
2197}
2198
2199pub async fn disable_recorder(
2201 State(state): State<AppState>,
2202 AuthUser(user_id): AuthUser,
2203 headers: HeaderMap,
2204 Path(deployment_id): Path<Uuid>,
2205) -> ApiResult<axum::http::Response<axum::body::Body>> {
2206 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2207 proxy_post_to_deployment_recorder(&deployment, "/api/recorder/disable").await
2208}
2209
2210pub async fn clear_recorder(
2215 State(state): State<AppState>,
2216 AuthUser(user_id): AuthUser,
2217 headers: HeaderMap,
2218 Path(deployment_id): Path<Uuid>,
2219) -> ApiResult<axum::http::Response<axum::body::Body>> {
2220 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2221 let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
2223 let Some(base) = base else {
2224 return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
2225 };
2226 let url = format!("{}/api/recorder/clear", base.trim_end_matches('/'));
2227 let client = reqwest::Client::builder()
2228 .timeout(std::time::Duration::from_secs(10))
2229 .build()
2230 .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
2231 let resp =
2232 client.delete(&url).send().await.map_err(|e| {
2233 ApiError::Internal(anyhow::anyhow!("Recorder clear proxy failed: {}", e))
2234 })?;
2235 let status = resp.status();
2236 let headers_resp = resp.headers().clone();
2237 let body = resp.bytes().await.map_err(|e| {
2238 ApiError::Internal(anyhow::anyhow!("Recorder clear read body failed: {}", e))
2239 })?;
2240 let mut builder = axum::http::Response::builder().status(status);
2241 if let Some(content_type) = headers_resp.get(axum::http::header::CONTENT_TYPE) {
2242 builder = builder.header(axum::http::header::CONTENT_TYPE, content_type);
2243 }
2244 builder.body(axum::body::Body::from(body)).map_err(|e| {
2245 ApiError::Internal(anyhow::anyhow!("Recorder clear response build failed: {}", e))
2246 })
2247}
2248
2249pub async fn replay_recorder_capture(
2254 State(state): State<AppState>,
2255 AuthUser(user_id): AuthUser,
2256 headers: HeaderMap,
2257 Path((deployment_id, capture_id)): Path<(Uuid, String)>,
2258) -> ApiResult<axum::http::Response<axum::body::Body>> {
2259 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2260 if capture_id.contains('/') || capture_id.contains('?') || capture_id.contains('#') {
2261 return Err(ApiError::InvalidRequest("Invalid capture id".to_string()));
2262 }
2263 let path = format!("/api/recorder/replay/{}", urlencoding::encode(&capture_id));
2264 proxy_post_to_deployment_recorder(&deployment, &path).await
2265}
2266
2267pub async fn export_recorder_captures_har(
2272 State(state): State<AppState>,
2273 AuthUser(user_id): AuthUser,
2274 headers: HeaderMap,
2275 Path(deployment_id): Path<Uuid>,
2276) -> ApiResult<axum::http::Response<axum::body::Body>> {
2277 let pool = state.db.pool();
2278
2279 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2280 .await
2281 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2282
2283 let deployment = HostedMock::find_by_id(pool, deployment_id)
2284 .await
2285 .map_err(ApiError::Database)?
2286 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2287
2288 if deployment.org_id != org_ctx.org_id {
2289 return Err(ApiError::InvalidRequest(
2290 "You don't have access to this deployment".to_string(),
2291 ));
2292 }
2293
2294 let _ = state;
2295 proxy_to_deployment_recorder(&deployment, "/api/recorder/export/har").await
2296}
2297
2298pub async fn export_recorder_captures_jsonl(
2304 State(state): State<AppState>,
2305 AuthUser(user_id): AuthUser,
2306 headers: HeaderMap,
2307 Path(deployment_id): Path<Uuid>,
2308) -> ApiResult<axum::http::Response<axum::body::Body>> {
2309 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2310 proxy_to_deployment_recorder(&deployment, "/api/recorder/export/jsonl").await
2311}
2312
2313pub async fn get_deployment_metrics(
2315 State(state): State<AppState>,
2316 AuthUser(user_id): AuthUser,
2317 headers: HeaderMap,
2318 Path(deployment_id): Path<Uuid>,
2319) -> ApiResult<Json<MetricsResponse>> {
2320 let pool = state.db.pool();
2321
2322 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2324 .await
2325 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2326
2327 let deployment = HostedMock::find_by_id(pool, deployment_id)
2329 .await
2330 .map_err(ApiError::Database)?
2331 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2332
2333 if deployment.org_id != org_ctx.org_id {
2335 return Err(ApiError::InvalidRequest(
2336 "You don't have access to this deployment".to_string(),
2337 ));
2338 }
2339
2340 if let Some(client) = crate::fly_metrics::global() {
2345 let app_name = deployment.fly_app_name();
2346 match client.snapshot_for_app(&app_name).await {
2347 Ok(snap) => {
2348 use chrono::Datelike;
2349 let now = Utc::now().date_naive();
2350 let period_start =
2351 chrono::NaiveDate::from_ymd_opt(now.year(), now.month(), 1).unwrap_or(now);
2352 return Ok(Json(MetricsResponse {
2353 requests: snap.requests,
2354 requests_2xx: snap.requests_2xx,
2355 requests_4xx: snap.requests_4xx,
2356 requests_5xx: snap.requests_5xx,
2357 egress_bytes: snap.egress_bytes,
2358 avg_response_time_ms: snap.avg_response_time_ms,
2359 period_start,
2360 }));
2361 }
2362 Err(err) => {
2363 tracing::warn!(
2364 app_name = %app_name,
2365 error = %err,
2366 "Fly Prometheus metrics query failed; falling back to local counters"
2367 );
2368 }
2369 }
2370 }
2371
2372 let metrics = DeploymentMetrics::get_or_create_current(pool, deployment_id)
2375 .await
2376 .map_err(ApiError::Database)?;
2377
2378 Ok(Json(MetricsResponse::from(metrics)))
2379}
2380
2381pub async fn upload_spec(
2383 State(state): State<AppState>,
2384 AuthUser(user_id): AuthUser,
2385 headers: HeaderMap,
2386 mut multipart: Multipart,
2387) -> ApiResult<Json<SpecUploadResponse>> {
2388 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2390 .await
2391 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2392
2393 let checker = PermissionChecker::new(&state);
2395 checker
2396 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
2397 .await?;
2398
2399 let mut file_data: Option<Vec<u8>> = None;
2401 let mut file_name = String::from("spec");
2402
2403 while let Some(field) = multipart
2404 .next_field()
2405 .await
2406 .map_err(|e| ApiError::InvalidRequest(format!("Failed to read multipart field: {}", e)))?
2407 {
2408 if field.name() == Some("file") || field.name() == Some("spec") {
2409 if let Some(name) = field.file_name() {
2410 file_name =
2411 name.to_string().replace(".yaml", "").replace(".yml", "").replace(".json", "");
2412 }
2413 let data = field.bytes().await.map_err(|e| {
2414 ApiError::InvalidRequest(format!("Failed to read file data: {}", e))
2415 })?;
2416
2417 let content = String::from_utf8(data.to_vec()).map_err(|_| {
2419 ApiError::InvalidRequest("File must be valid UTF-8 text".to_string())
2420 })?;
2421
2422 let spec_value: serde_json::Value =
2424 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&content) {
2425 v
2426 } else if let Ok(v) = serde_yaml::from_str::<serde_json::Value>(&content) {
2427 v
2428 } else {
2429 return Err(ApiError::InvalidRequest(
2430 "File must be a valid JSON or YAML OpenAPI specification".to_string(),
2431 ));
2432 };
2433
2434 if spec_value.get("openapi").is_none() && spec_value.get("swagger").is_none() {
2436 return Err(ApiError::InvalidRequest(
2437 "File must contain an 'openapi' or 'swagger' field".to_string(),
2438 ));
2439 }
2440
2441 let json_data = serde_json::to_vec_pretty(&spec_value).map_err(|e| {
2443 ApiError::Internal(anyhow::anyhow!("Failed to serialize spec: {}", e))
2444 })?;
2445
2446 file_data = Some(json_data);
2447 }
2448 }
2449
2450 let data = file_data.ok_or_else(|| {
2451 ApiError::InvalidRequest("No 'file' or 'spec' field in upload".to_string())
2452 })?;
2453
2454 let url = state
2456 .storage
2457 .upload_spec(&org_ctx.org_id.to_string(), &file_name, data)
2458 .await
2459 .map_err(|e| ApiError::Internal(anyhow::anyhow!("Failed to upload spec: {}", e)))?;
2460
2461 Ok(Json(SpecUploadResponse { url }))
2462}
2463
2464#[derive(Debug, Serialize)]
2465pub struct SpecUploadResponse {
2466 pub url: String,
2467}
2468
2469#[derive(Debug, Deserialize)]
2472pub struct CreateDeploymentRequest {
2473 pub name: String,
2474 pub slug: Option<String>,
2475 pub description: Option<String>,
2476 pub project_id: Option<Uuid>,
2477 pub config_json: serde_json::Value,
2478 pub openapi_spec_url: Option<String>,
2479 pub region: Option<String>,
2480 #[serde(default)]
2485 pub enabled_protocols: Option<Vec<crate::models::Protocol>>,
2486 #[serde(default)]
2491 pub upstream_url: Option<String>,
2492}
2493
2494#[derive(Debug, Deserialize)]
2495pub struct UpdateStatusRequest {
2496 pub status: String,
2497 pub error_message: Option<String>,
2498 pub deployment_url: Option<String>,
2499 pub internal_url: Option<String>,
2500}
2501
2502#[derive(Debug, Serialize)]
2503pub struct DeploymentResponse {
2504 pub id: Uuid,
2505 pub org_id: Uuid,
2506 pub project_id: Option<Uuid>,
2507 pub name: String,
2508 pub slug: String,
2509 pub description: Option<String>,
2510 pub status: String,
2511 pub deployment_url: Option<String>,
2512 pub openapi_spec_url: Option<String>,
2513 pub region: String,
2514 pub instance_type: String,
2515 pub health_status: String,
2516 pub error_message: Option<String>,
2517 pub enabled_protocols: Vec<crate::models::Protocol>,
2518 pub upstream_url: Option<String>,
2522 pub created_at: DateTime<Utc>,
2523 pub updated_at: DateTime<Utc>,
2524}
2525
2526impl From<HostedMock> for DeploymentResponse {
2527 fn from(mock: HostedMock) -> Self {
2528 let status = mock.status().to_string();
2529 let health_status = mock.health_status().to_string();
2530 let enabled_protocols = mock.enabled_protocols();
2531 let upstream_url = mock.upstream_url();
2532 Self {
2533 id: mock.id,
2534 org_id: mock.org_id,
2535 project_id: mock.project_id,
2536 name: mock.name,
2537 slug: mock.slug,
2538 description: mock.description,
2539 status,
2540 deployment_url: mock.deployment_url,
2541 openapi_spec_url: mock.openapi_spec_url,
2542 region: mock.region,
2543 instance_type: mock.instance_type,
2544 health_status,
2545 error_message: mock.error_message,
2546 enabled_protocols,
2547 upstream_url,
2548 created_at: mock.created_at,
2549 updated_at: mock.updated_at,
2550 }
2551 }
2552}
2553
2554#[derive(Debug, Serialize)]
2555pub struct LogResponse {
2556 pub id: Uuid,
2557 pub level: String,
2558 pub message: String,
2559 pub metadata: serde_json::Value,
2560 pub created_at: DateTime<Utc>,
2561}
2562
2563impl From<DeploymentLog> for LogResponse {
2564 fn from(log: DeploymentLog) -> Self {
2565 Self {
2566 id: log.id,
2567 level: log.level,
2568 message: log.message,
2569 metadata: log.metadata_json,
2570 created_at: log.created_at,
2571 }
2572 }
2573}
2574
2575#[derive(Debug, Serialize)]
2576pub struct MetricsResponse {
2577 pub requests: i64,
2578 pub requests_2xx: i64,
2579 pub requests_4xx: i64,
2580 pub requests_5xx: i64,
2581 pub egress_bytes: i64,
2582 pub avg_response_time_ms: i64,
2583 pub period_start: chrono::NaiveDate,
2584}
2585
2586impl From<DeploymentMetrics> for MetricsResponse {
2587 fn from(metrics: DeploymentMetrics) -> Self {
2588 Self {
2589 requests: metrics.requests,
2590 requests_2xx: metrics.requests_2xx,
2591 requests_4xx: metrics.requests_4xx,
2592 requests_5xx: metrics.requests_5xx,
2593 egress_bytes: metrics.egress_bytes,
2594 avg_response_time_ms: metrics.avg_response_time_ms,
2595 period_start: metrics.period_start,
2596 }
2597 }
2598}
2599
2600#[derive(Debug, Deserialize)]
2601pub struct SetDomainRequest {
2602 pub domain: String,
2603}
2604
2605pub async fn set_domain(
2611 State(state): State<AppState>,
2612 AuthUser(user_id): AuthUser,
2613 headers: HeaderMap,
2614 Path(deployment_id): Path<Uuid>,
2615 Json(request): Json<SetDomainRequest>,
2616) -> ApiResult<Json<serde_json::Value>> {
2617 let pool = state.db.pool();
2618
2619 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2621 .await
2622 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2623
2624 let checker = PermissionChecker::new(&state);
2626 checker
2627 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
2628 .await?;
2629
2630 let deployment = HostedMock::find_by_id(pool, deployment_id)
2632 .await
2633 .map_err(ApiError::Database)?
2634 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2635
2636 if deployment.org_id != org_ctx.org_id {
2638 return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
2639 }
2640
2641 let hostname = format!("{}.{}", deployment.slug, request.domain);
2642
2643 let new_url = format!("https://{}", hostname);
2650 sqlx::query(
2651 r#"
2652 UPDATE hosted_mocks
2653 SET deployment_url = $1,
2654 metadata_json = jsonb_set(
2655 COALESCE(metadata_json, '{}'::jsonb),
2656 '{custom_domain}',
2657 to_jsonb($2::text)
2658 ),
2659 updated_at = NOW()
2660 WHERE id = $3
2661 "#,
2662 )
2663 .bind(&new_url)
2664 .bind(&hostname)
2665 .bind(deployment_id)
2666 .execute(pool)
2667 .await
2668 .map_err(|e| ApiError::Internal(anyhow::anyhow!("Failed to update deployment URL: {}", e)))?;
2669
2670 DeploymentLog::create(
2671 pool,
2672 deployment_id,
2673 "info",
2674 &format!("Custom domain set: {}", hostname),
2675 None,
2676 )
2677 .await
2678 .ok();
2679
2680 Ok(Json(serde_json::json!({
2681 "hostname": hostname,
2682 "deployment_url": new_url,
2683 })))
2684}
2685
2686pub async fn get_custom_domain(
2689 State(state): State<AppState>,
2690 AuthUser(user_id): AuthUser,
2691 headers: HeaderMap,
2692 Path(deployment_id): Path<Uuid>,
2693) -> ApiResult<Json<serde_json::Value>> {
2694 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2695 let hostname = deployment
2696 .metadata_json
2697 .get("custom_domain")
2698 .and_then(|v| v.as_str())
2699 .map(|s| s.to_string());
2700 Ok(Json(serde_json::json!({
2701 "hostname": hostname,
2702 "deployment_url": deployment.deployment_url,
2703 })))
2704}
2705
2706pub async fn clear_custom_domain(
2712 State(state): State<AppState>,
2713 AuthUser(user_id): AuthUser,
2714 headers: HeaderMap,
2715 Path(deployment_id): Path<Uuid>,
2716) -> ApiResult<Json<serde_json::Value>> {
2717 let pool = state.db.pool();
2718
2719 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2720 .await
2721 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2722
2723 let checker = PermissionChecker::new(&state);
2724 checker
2725 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
2726 .await?;
2727
2728 let deployment = HostedMock::find_by_id(pool, deployment_id)
2729 .await
2730 .map_err(ApiError::Database)?
2731 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2732 if deployment.org_id != org_ctx.org_id {
2733 return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
2734 }
2735
2736 let app_name = deployment.fly_app_name();
2737 let default_url = if let Ok(domain) = std::env::var("MOCKFORGE_MOCKS_DOMAIN") {
2738 format!("https://{}.{}", deployment.slug, domain)
2739 } else {
2740 format!("https://{}.fly.dev", app_name)
2741 };
2742
2743 sqlx::query(
2744 r#"
2745 UPDATE hosted_mocks
2746 SET deployment_url = $1,
2747 metadata_json = COALESCE(metadata_json, '{}'::jsonb) - 'custom_domain',
2748 updated_at = NOW()
2749 WHERE id = $2
2750 "#,
2751 )
2752 .bind(&default_url)
2753 .bind(deployment_id)
2754 .execute(pool)
2755 .await
2756 .map_err(|e| ApiError::Internal(anyhow::anyhow!("Failed to clear custom domain: {}", e)))?;
2757
2758 DeploymentLog::create(pool, deployment_id, "info", "Custom domain removed", None)
2759 .await
2760 .ok();
2761
2762 Ok(Json(serde_json::json!({
2763 "hostname": serde_json::Value::Null,
2764 "deployment_url": default_url,
2765 })))
2766}
2767
2768#[derive(Debug, Default, Deserialize)]
2773#[serde(default, rename_all = "camelCase")]
2774pub struct TriggerSmokeRunRequest {
2775 pub latency_budget_ms: Option<u64>,
2777 pub methods: Option<Vec<String>>,
2781}
2782
2783pub async fn trigger_smoke_run(
2807 State(state): State<AppState>,
2808 AuthUser(user_id): AuthUser,
2809 Path(deployment_id): Path<Uuid>,
2810 headers: HeaderMap,
2811 body: Option<Json<TriggerSmokeRunRequest>>,
2812) -> ApiResult<Json<TestRun>> {
2813 let req = body.map(|Json(r)| r).unwrap_or_default();
2814
2815 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2817 .await
2818 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2819
2820 let checker = PermissionChecker::new(&state);
2821 checker
2822 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
2823 .await?;
2824
2825 let deployment = state
2826 .store
2827 .find_hosted_mock_by_id(deployment_id)
2828 .await?
2829 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2830 if deployment.org_id != org_ctx.org_id {
2831 return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
2833 }
2834 if deployment.status != "active" {
2835 return Err(ApiError::InvalidRequest(format!(
2842 "Deployment is in '{}' status; smoke runs require 'active'",
2843 deployment.status,
2844 )));
2845 }
2846
2847 let limits = crate::handlers::usage::effective_limits(&state, &org_ctx.org).await?;
2849 let max_concurrent = limits.get("max_concurrent_runs").and_then(|v| v.as_i64()).unwrap_or(0);
2850 if max_concurrent == 0 {
2851 return Err(ApiError::ResourceLimitExceeded(
2852 "Test execution is not enabled on this plan".into(),
2853 ));
2854 }
2855 if max_concurrent > 0 {
2856 let inflight = TestRun::count_inflight(state.db.pool(), org_ctx.org_id)
2857 .await
2858 .map_err(ApiError::Database)?;
2859 if inflight.total() >= max_concurrent {
2860 return Err(ApiError::ResourceLimitExceeded(format!(
2861 "Concurrent run limit reached ({}/{}).",
2862 inflight.total(),
2863 max_concurrent,
2864 )));
2865 }
2866 }
2867
2868 let payload = build_smoke_payload(&deployment, &req)?;
2870
2871 let run = TestRun::enqueue(
2873 state.db.pool(),
2874 EnqueueTestRun {
2875 suite_id: deployment.id,
2876 org_id: org_ctx.org_id,
2877 kind: "smoke",
2878 triggered_by: "manual",
2879 triggered_by_user: Some(user_id),
2880 git_ref: None,
2881 git_sha: None,
2882 },
2883 )
2884 .await
2885 .map_err(ApiError::Database)?;
2886
2887 if let Err(e) = crate::run_queue::enqueue(
2889 state.redis.as_ref(),
2890 crate::run_queue::EnqueuedJob {
2891 run_id: run.id,
2892 org_id: run.org_id,
2893 source_id: deployment.id,
2894 kind: "smoke",
2895 payload,
2896 },
2897 )
2898 .await
2899 {
2900 tracing::error!(run_id = %run.id, error = %e, "failed to enqueue smoke run");
2905 }
2906
2907 Ok(Json(run))
2908}
2909
2910fn build_smoke_payload(
2915 deployment: &HostedMock,
2916 req: &TriggerSmokeRunRequest,
2917) -> ApiResult<serde_json::Value> {
2918 let base_url =
2919 deployment.deployment_url.as_deref().filter(|s| !s.is_empty()).ok_or_else(|| {
2920 ApiError::InvalidRequest(
2921 "Deployment has no public URL — wait for the deploy to finish before running smoke"
2922 .to_string(),
2923 )
2924 })?;
2925 let spec_url =
2926 deployment
2927 .openapi_spec_url
2928 .as_deref()
2929 .filter(|s| !s.is_empty())
2930 .ok_or_else(|| {
2931 ApiError::InvalidRequest(
2932 "Deployment has no OpenAPI spec — upload one before running smoke".to_string(),
2933 )
2934 })?;
2935
2936 let mut payload = serde_json::json!({
2937 "deployment_id": deployment.id,
2938 "base_url": base_url,
2939 "openapi_spec_url": spec_url,
2940 });
2941 let obj = payload
2942 .as_object_mut()
2943 .expect("payload was constructed as an object on the line above");
2944
2945 if let Some(budget) = req.latency_budget_ms {
2946 obj.insert("latency_budget_ms".into(), budget.into());
2947 }
2948 if let Some(methods) = req.methods.as_ref() {
2949 let v = serde_json::to_value(methods)
2952 .map_err(|e| ApiError::InvalidRequest(format!("invalid methods array: {e}")))?;
2953 obj.insert("methods".into(), v);
2954 }
2955
2956 Ok(payload)
2957}
2958
2959#[cfg(test)]
2960mod smoke_trigger_tests {
2961 use super::*;
2962
2963 fn deployment_with(
2964 status: &str,
2965 deployment_url: Option<&str>,
2966 spec_url: Option<&str>,
2967 ) -> HostedMock {
2968 HostedMock {
2973 id: Uuid::new_v4(),
2974 org_id: Uuid::new_v4(),
2975 project_id: None,
2976 name: "test".to_string(),
2977 slug: "test".to_string(),
2978 description: None,
2979 config_json: serde_json::json!({}),
2980 openapi_spec_url: spec_url.map(String::from),
2981 status: status.to_string(),
2982 deployment_url: deployment_url.map(String::from),
2983 internal_url: None,
2984 region: "iad".to_string(),
2985 instance_type: "shared-cpu-1x".to_string(),
2986 health_check_url: None,
2987 last_health_check: None,
2988 health_status: "unknown".to_string(),
2989 error_message: None,
2990 metadata_json: serde_json::json!({}),
2991 created_at: Utc::now(),
2992 updated_at: Utc::now(),
2993 deleted_at: None,
2994 }
2995 }
2996
2997 #[test]
2998 fn build_smoke_payload_uses_deployment_urls() {
2999 let dep = deployment_with(
3000 "running",
3001 Some("https://my-mock.fly.dev"),
3002 Some("https://specs.example.com/abc.json"),
3003 );
3004 let req = TriggerSmokeRunRequest::default();
3005 let payload = build_smoke_payload(&dep, &req).unwrap();
3006 assert_eq!(payload["base_url"], "https://my-mock.fly.dev");
3007 assert_eq!(payload["openapi_spec_url"], "https://specs.example.com/abc.json");
3008 assert_eq!(payload["deployment_id"], serde_json::json!(dep.id));
3009 assert!(payload.get("latency_budget_ms").is_none());
3010 assert!(payload.get("methods").is_none());
3011 }
3012
3013 #[test]
3014 fn build_smoke_payload_passes_overrides() {
3015 let dep = deployment_with("running", Some("https://x"), Some("https://y"));
3016 let req = TriggerSmokeRunRequest {
3017 latency_budget_ms: Some(2000),
3018 methods: Some(vec!["GET".into(), "HEAD".into()]),
3019 };
3020 let payload = build_smoke_payload(&dep, &req).unwrap();
3021 assert_eq!(payload["latency_budget_ms"], 2000);
3022 assert_eq!(payload["methods"], serde_json::json!(["GET", "HEAD"]));
3023 }
3024
3025 #[test]
3026 fn build_smoke_payload_rejects_missing_deployment_url() {
3027 let dep = deployment_with("running", None, Some("https://y"));
3028 let err = build_smoke_payload(&dep, &TriggerSmokeRunRequest::default()).unwrap_err();
3029 match err {
3030 ApiError::InvalidRequest(msg) => assert!(msg.contains("public URL")),
3031 other => panic!("expected InvalidRequest, got {other:?}"),
3032 }
3033 }
3034
3035 #[test]
3036 fn build_smoke_payload_rejects_empty_deployment_url() {
3037 let dep = deployment_with("running", Some(""), Some("https://y"));
3038 assert!(matches!(
3039 build_smoke_payload(&dep, &TriggerSmokeRunRequest::default()),
3040 Err(ApiError::InvalidRequest(_))
3041 ));
3042 }
3043
3044 #[test]
3045 fn build_smoke_payload_rejects_missing_spec_url() {
3046 let dep = deployment_with("running", Some("https://x"), None);
3047 let err = build_smoke_payload(&dep, &TriggerSmokeRunRequest::default()).unwrap_err();
3048 match err {
3049 ApiError::InvalidRequest(msg) => assert!(msg.contains("OpenAPI spec")),
3050 other => panic!("expected InvalidRequest, got {other:?}"),
3051 }
3052 }
3053}