1use axum::{
6 extract::{Multipart, Path, Query, State},
7 http::HeaderMap,
8 response::sse::{Event, KeepAlive, Sse},
9 Json,
10};
11use chrono::{DateTime, Utc};
12use futures_util::stream::Stream;
13use serde::{Deserialize, Serialize};
14use std::convert::Infallible;
15use uuid::Uuid;
16
17use crate::{
18 deployment::flyio::FlyioClient,
19 error::{ApiError, ApiResult},
20 fly_logs::LogEntry,
21 middleware::{
22 permission_check::PermissionChecker, permissions::Permission, resolve_org_context, AuthUser,
23 },
24 models::{
25 feature_usage::FeatureType, AuditEventType, DeploymentLog, DeploymentMetrics,
26 DeploymentStatus, HostedMock, Subscription, SubscriptionStatus, TestRun,
27 },
28 AppState,
29};
30use mockforge_registry_core::models::test_run::EnqueueTestRun;
31use tracing::warn;
32
33pub async fn create_deployment(
35 State(state): State<AppState>,
36 AuthUser(user_id): AuthUser,
37 headers: HeaderMap,
38 Json(request): Json<CreateDeploymentRequest>,
39) -> ApiResult<Json<DeploymentResponse>> {
40 let pool = state.db.pool();
41
42 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
44 .await
45 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
46
47 let checker = PermissionChecker::new(&state);
49 checker
50 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
51 .await?;
52
53 if let Some(subscription) = Subscription::find_by_org(pool, org_ctx.org_id).await? {
71 if subscription.status() == SubscriptionStatus::PastDue {
72 const PAST_DUE_GRACE_SECONDS: i64 = 24 * 60 * 60;
73 let elapsed = (chrono::Utc::now() - subscription.updated_at).num_seconds();
74 if elapsed > PAST_DUE_GRACE_SECONDS {
75 return Err(ApiError::InvalidRequest(
76 "Subscription has been past due for over 24 hours. Please update your payment method in the billing portal before deploying new mocks.".to_string(),
77 ));
78 }
79 tracing::info!(
80 org_id = %org_ctx.org_id,
81 past_due_seconds = elapsed,
82 "past_due subscription within 24h grace; allowing deploy"
83 );
84 }
85 }
86
87 let limits = &org_ctx.org.limits_json;
89 let max_hosted_mocks = limits.get("max_hosted_mocks").and_then(|v| v.as_i64()).unwrap_or(0);
90
91 if max_hosted_mocks >= 0 {
92 let existing = state.store.list_hosted_mocks_by_org(org_ctx.org_id).await?;
94
95 let active_count = existing
96 .iter()
97 .filter(|m| {
98 matches!(m.status(), DeploymentStatus::Active | DeploymentStatus::Deploying)
99 })
100 .count();
101
102 if active_count as i64 >= max_hosted_mocks {
103 return Err(ApiError::InvalidRequest(format!(
104 "Hosted mocks limit exceeded. Your plan allows {} active deployments. Upgrade to deploy more.",
105 max_hosted_mocks
106 )));
107 }
108 }
109
110 let generated_slug;
112 let slug = match request.slug.as_deref() {
113 Some(s) => s,
114 None => {
115 generated_slug = request
116 .name
117 .to_lowercase()
118 .chars()
119 .map(|c| {
120 if c.is_alphanumeric() || c == '-' {
121 c
122 } else {
123 '-'
124 }
125 })
126 .collect::<String>()
127 .trim_matches('-')
128 .replace("--", "-");
129 &generated_slug
130 }
131 };
132
133 if !slug.chars().all(|c| c.is_alphanumeric() || c == '-') {
134 return Err(ApiError::InvalidRequest(
135 "Slug must contain only alphanumeric characters and hyphens".to_string(),
136 ));
137 }
138
139 if state.store.find_hosted_mock_by_slug(org_ctx.org_id, slug).await?.is_some() {
141 return Err(ApiError::InvalidRequest(format!(
142 "A deployment with slug '{}' already exists",
143 slug
144 )));
145 }
146
147 let mut enabled_protocols = request.enabled_protocols.clone().unwrap_or_default();
152 if !enabled_protocols.contains(&crate::models::Protocol::Http) {
153 enabled_protocols.insert(0, crate::models::Protocol::Http);
154 }
155 let plan = org_ctx.org.plan.clone();
156 if !crate::models::protocols_allowed_on_plan(&enabled_protocols, &plan) {
157 let blocked: Vec<String> = enabled_protocols
158 .iter()
159 .filter(|p| !crate::models::protocols_allowed_on_plan(&[**p], &plan))
160 .map(|p| format!("{:?}", p))
161 .collect();
162 return Err(ApiError::InvalidRequest(format!(
163 "These protocols require a higher plan than '{}': {}",
164 plan,
165 blocked.join(", ")
166 )));
167 }
168
169 let mut config_json = request.config_json.clone();
173 if !config_json.is_object() {
174 config_json = serde_json::json!({});
175 }
176 if let Some(obj) = config_json.as_object_mut() {
177 obj.insert(
178 "enabled_protocols".to_string(),
179 serde_json::to_value(&enabled_protocols).unwrap_or(serde_json::Value::Null),
180 );
181 if let Some(upstream) = request.upstream_url.as_ref() {
182 let trimmed = upstream.trim();
183 if !trimmed.is_empty() {
184 obj.insert(
185 "upstream_url".to_string(),
186 serde_json::Value::String(trimmed.to_string()),
187 );
188 }
189 }
190 }
191
192 let deployment = state
194 .store
195 .create_hosted_mock(
196 org_ctx.org_id,
197 request.project_id,
198 &request.name,
199 slug,
200 request.description.as_deref(),
201 config_json,
202 request.openapi_spec_url.as_deref(),
203 request.region.as_deref(),
204 )
205 .await?;
206
207 DeploymentLog::create(
209 pool,
210 deployment.id,
211 "info",
212 "Deployment created",
213 Some(serde_json::json!({
214 "name": request.name,
215 "slug": slug,
216 })),
217 )
218 .await
219 .map_err(ApiError::Database)?;
220
221 state
224 .store
225 .update_hosted_mock_status(deployment.id, DeploymentStatus::Pending, None)
226 .await?;
227
228 state
230 .store
231 .record_feature_usage(
232 org_ctx.org_id,
233 Some(user_id),
234 FeatureType::HostedMockDeploy,
235 Some(serde_json::json!({
236 "deployment_id": deployment.id,
237 "name": request.name,
238 "slug": slug,
239 })),
240 )
241 .await;
242
243 let ip_address = headers
245 .get("X-Forwarded-For")
246 .or_else(|| headers.get("X-Real-IP"))
247 .and_then(|h| h.to_str().ok())
248 .map(|s| s.split(',').next().unwrap_or(s).trim());
249 let user_agent = headers.get("User-Agent").and_then(|h| h.to_str().ok());
250
251 state
252 .store
253 .record_audit_event(
254 org_ctx.org_id,
255 Some(user_id),
256 AuditEventType::DeploymentCreated,
257 format!("Hosted mock deployment '{}' created", request.name),
258 Some(serde_json::json!({
259 "deployment_id": deployment.id,
260 "name": request.name,
261 "slug": slug,
262 "project_id": request.project_id,
263 })),
264 ip_address,
265 user_agent,
266 )
267 .await;
268
269 let deployment = state.store.find_hosted_mock_by_id(deployment.id).await?.ok_or_else(|| {
271 ApiError::Internal(anyhow::anyhow!("Failed to retrieve created deployment"))
272 })?;
273
274 Ok(Json(DeploymentResponse::from(deployment)))
275}
276
277pub async fn list_deployments(
279 State(state): State<AppState>,
280 AuthUser(user_id): AuthUser,
281 headers: HeaderMap,
282) -> ApiResult<Json<Vec<DeploymentResponse>>> {
283 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
285 .await
286 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
287
288 let deployments = state.store.list_hosted_mocks_by_org(org_ctx.org_id).await?;
290
291 let responses: Vec<DeploymentResponse> =
292 deployments.into_iter().map(DeploymentResponse::from).collect();
293
294 Ok(Json(responses))
295}
296
297pub async fn get_deployment(
299 State(state): State<AppState>,
300 AuthUser(user_id): AuthUser,
301 headers: HeaderMap,
302 Path(deployment_id): Path<Uuid>,
303) -> ApiResult<Json<DeploymentResponse>> {
304 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
306 .await
307 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
308
309 let deployment = state
311 .store
312 .find_hosted_mock_by_id(deployment_id)
313 .await?
314 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
315
316 if deployment.org_id != org_ctx.org_id {
318 return Err(ApiError::InvalidRequest(
319 "You don't have access to this deployment".to_string(),
320 ));
321 }
322
323 Ok(Json(DeploymentResponse::from(deployment)))
324}
325
326pub async fn update_deployment_status(
328 State(state): State<AppState>,
329 AuthUser(user_id): AuthUser,
330 headers: HeaderMap,
331 Path(deployment_id): Path<Uuid>,
332 Json(request): Json<UpdateStatusRequest>,
333) -> ApiResult<Json<DeploymentResponse>> {
334 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
336 .await
337 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
338
339 let checker = PermissionChecker::new(&state);
341 checker
342 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
343 .await?;
344
345 let deployment = state
347 .store
348 .find_hosted_mock_by_id(deployment_id)
349 .await?
350 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
351
352 if deployment.org_id != org_ctx.org_id {
354 return Err(ApiError::InvalidRequest(
355 "You don't have access to this deployment".to_string(),
356 ));
357 }
358
359 let status = DeploymentStatus::from_str(&request.status)
361 .ok_or_else(|| ApiError::InvalidRequest("Invalid status".to_string()))?;
362
363 state
364 .store
365 .update_hosted_mock_status(deployment_id, status, request.error_message.as_deref())
366 .await?;
367
368 if request.deployment_url.is_some() || request.internal_url.is_some() {
370 state
371 .store
372 .update_hosted_mock_urls(
373 deployment_id,
374 request.deployment_url.as_deref(),
375 request.internal_url.as_deref(),
376 )
377 .await?;
378 }
379
380 let deployment = state.store.find_hosted_mock_by_id(deployment_id).await?.ok_or_else(|| {
382 ApiError::Internal(anyhow::anyhow!("Failed to retrieve updated deployment"))
383 })?;
384
385 if let Ok(Some(org)) = state.store.find_organization_by_id(deployment.org_id).await {
387 if let Ok(Some(owner)) = state.store.find_user_by_id(org.owner_id).await {
388 let status_str = format!("{:?}", deployment.status()).to_lowercase();
389 let email_msg = crate::email::EmailService::generate_deployment_status_email(
390 &owner.username,
391 &owner.email,
392 &deployment.name,
393 &status_str,
394 deployment.deployment_url.as_deref(),
395 request.error_message.as_deref(),
396 );
397
398 tokio::spawn(async move {
399 match crate::email::EmailService::from_env() {
400 Ok(email_service) => {
401 if let Err(e) = email_service.send(email_msg).await {
402 tracing::warn!("Failed to send deployment status email: {}", e);
403 }
404 }
405 Err(e) => {
406 tracing::warn!("Failed to create email service: {}", e);
407 }
408 }
409 });
410 }
411 }
412
413 Ok(Json(DeploymentResponse::from(deployment)))
414}
415
416pub async fn delete_deployment(
418 State(state): State<AppState>,
419 AuthUser(user_id): AuthUser,
420 headers: HeaderMap,
421 Path(deployment_id): Path<Uuid>,
422) -> ApiResult<Json<serde_json::Value>> {
423 let pool = state.db.pool();
424
425 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
427 .await
428 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
429
430 let checker = PermissionChecker::new(&state);
432 checker
433 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockDelete)
434 .await?;
435
436 let deployment = state
438 .store
439 .find_hosted_mock_by_id(deployment_id)
440 .await?
441 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
442
443 if deployment.org_id != org_ctx.org_id {
445 return Err(ApiError::InvalidRequest(
446 "You don't have access to this deployment".to_string(),
447 ));
448 }
449
450 let ip_address = headers
452 .get("X-Forwarded-For")
453 .or_else(|| headers.get("X-Real-IP"))
454 .and_then(|h| h.to_str().ok())
455 .map(|s| s.split(',').next().unwrap_or(s).trim());
456 let user_agent = headers.get("User-Agent").and_then(|h| h.to_str().ok());
457
458 state
459 .store
460 .record_audit_event(
461 org_ctx.org_id,
462 Some(user_id),
463 AuditEventType::DeploymentDeleted,
464 format!("Hosted mock deployment '{}' deleted", deployment.name),
465 Some(serde_json::json!({
466 "deployment_id": deployment.id,
467 "name": deployment.name,
468 "slug": deployment.slug,
469 })),
470 ip_address,
471 user_agent,
472 )
473 .await;
474
475 state
477 .store
478 .update_hosted_mock_status(deployment_id, DeploymentStatus::Deleting, None)
479 .await?;
480
481 if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
484 let flyio_client = FlyioClient::new(flyio_token);
485
486 let app_name = format!(
488 "mockforge-{}-{}",
489 deployment
490 .org_id
491 .to_string()
492 .replace("-", "")
493 .chars()
494 .take(8)
495 .collect::<String>(),
496 deployment.slug
497 );
498
499 let machine_id = deployment.metadata_json.get("flyio_machine_id").and_then(|v| v.as_str());
501
502 if let Some(machine_id) = machine_id {
503 match flyio_client.delete_machine(&app_name, machine_id).await {
505 Ok(_) => {
506 DeploymentLog::create(
507 pool,
508 deployment_id,
509 "info",
510 &format!("Deleted Fly.io machine: {}", machine_id),
511 None,
512 )
513 .await
514 .ok();
515 }
516 Err(e) => {
517 warn!("Failed to delete Fly.io machine {}: {}", machine_id, e);
518 DeploymentLog::create(
519 pool,
520 deployment_id,
521 "warning",
522 &format!("Failed to delete Fly.io machine: {}", e),
523 None,
524 )
525 .await
526 .ok();
527 }
529 }
530 } else {
531 warn!(
533 "Machine ID not found in metadata for deployment {}, attempting to list machines",
534 deployment_id
535 );
536 match flyio_client.list_machines(&app_name).await {
537 Ok(machines) => {
538 for machine in machines {
539 if let Err(e) = flyio_client.delete_machine(&app_name, &machine.id).await {
540 warn!("Failed to delete Fly.io machine {}: {}", machine.id, e);
541 } else {
542 DeploymentLog::create(
543 pool,
544 deployment_id,
545 "info",
546 &format!("Deleted Fly.io machine: {}", machine.id),
547 None,
548 )
549 .await
550 .ok();
551 }
552 }
553 }
554 Err(e) => {
555 warn!("Failed to list Fly.io machines for app {}: {}", app_name, e);
556 }
558 }
559 }
560
561 match flyio_client.delete_app(&app_name).await {
563 Ok(_) => {
564 DeploymentLog::create(
565 pool,
566 deployment_id,
567 "info",
568 &format!("Deleted Fly.io app: {}", app_name),
569 None,
570 )
571 .await
572 .ok();
573 }
574 Err(e) => {
575 warn!("Failed to delete Fly.io app {}: {}", app_name, e);
576 }
577 }
578 }
579
580 state.store.delete_hosted_mock(deployment_id).await?;
582
583 DeploymentLog::create(pool, deployment_id, "info", "Deployment deleted successfully", None)
584 .await
585 .ok(); Ok(Json(serde_json::json!({
588 "success": true,
589 "message": "Deployment deleted"
590 })))
591}
592
593#[derive(Debug, Deserialize, Default)]
595pub struct RedeployRequest {
596 pub config_json: Option<serde_json::Value>,
598 pub openapi_spec_url: Option<String>,
600}
601
602pub async fn redeploy_deployment(
606 State(state): State<AppState>,
607 AuthUser(user_id): AuthUser,
608 headers: HeaderMap,
609 Path(deployment_id): Path<Uuid>,
610 body: Option<Json<RedeployRequest>>,
611) -> ApiResult<Json<serde_json::Value>> {
612 let pool = state.db.pool();
613
614 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
616 .await
617 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
618
619 let checker = PermissionChecker::new(&state);
621 checker
622 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
623 .await?;
624 let deployment = state
628 .store
629 .find_hosted_mock_by_id(deployment_id)
630 .await?
631 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
632
633 if deployment.org_id != org_ctx.org_id {
635 return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
636 }
637
638 let status = deployment.status();
640 if !matches!(status, DeploymentStatus::Active | DeploymentStatus::Failed) {
641 return Err(ApiError::InvalidRequest(format!(
642 "Cannot redeploy a deployment with status '{}'. Must be 'active' or 'failed'.",
643 status
644 )));
645 }
646
647 let request = body.map(|b| b.0).unwrap_or_default();
649 if request.config_json.is_some() || request.openapi_spec_url.is_some() {
650 let mut query = String::from("UPDATE hosted_mocks SET updated_at = NOW()");
651 let mut param_count = 0;
652
653 if request.config_json.is_some() {
654 param_count += 1;
655 query.push_str(&format!(", config_json = ${}", param_count));
656 }
657 if request.openapi_spec_url.is_some() {
658 param_count += 1;
659 query.push_str(&format!(", openapi_spec_url = ${}", param_count));
660 }
661 param_count += 1;
662 query.push_str(&format!(" WHERE id = ${}", param_count));
663
664 let mut q = sqlx::query(&query);
665 if let Some(ref config) = request.config_json {
666 q = q.bind(config);
667 }
668 if let Some(ref spec_url) = request.openapi_spec_url {
669 q = q.bind(spec_url);
670 }
671 q = q.bind(deployment_id);
672 q.execute(pool).await.map_err(|e| {
673 ApiError::Internal(anyhow::anyhow!("Failed to update deployment: {}", e))
674 })?;
675 }
676
677 state
679 .store
680 .update_hosted_mock_status(deployment_id, DeploymentStatus::Deploying, None)
681 .await?;
682
683 DeploymentLog::create(pool, deployment_id, "info", "Redeployment initiated", None)
684 .await
685 .ok();
686
687 let pool_clone = pool.clone();
689 let deployment_id_clone = deployment_id;
690 tokio::spawn(async move {
691 let pool = &pool_clone;
692
693 let updated_deployment = match HostedMock::find_by_id(pool, deployment_id_clone).await {
695 Ok(Some(d)) => d,
696 Ok(None) => {
697 tracing::error!("Deployment {} not found during redeploy", deployment_id_clone);
698 return;
699 }
700 Err(e) => {
701 tracing::error!(
702 "Failed to fetch deployment {} for redeploy: {}",
703 deployment_id_clone,
704 e
705 );
706 return;
707 }
708 };
709
710 if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
712 let flyio_client = FlyioClient::new(flyio_token);
713
714 let machine_id = updated_deployment
715 .metadata_json
716 .get("flyio_machine_id")
717 .and_then(|v| v.as_str());
718
719 if let Some(machine_id) = machine_id {
720 let app_name = format!(
721 "mockforge-{}-{}",
722 updated_deployment
723 .org_id
724 .to_string()
725 .replace('-', "")
726 .chars()
727 .take(8)
728 .collect::<String>(),
729 updated_deployment.slug
730 );
731
732 let mut env = std::collections::HashMap::new();
734 env.insert(
735 "MOCKFORGE_DEPLOYMENT_ID".to_string(),
736 updated_deployment.id.to_string(),
737 );
738 env.insert("MOCKFORGE_ORG_ID".to_string(), updated_deployment.org_id.to_string());
739 if let Ok(config_str) = serde_json::to_string(&updated_deployment.config_json) {
740 env.insert("MOCKFORGE_CONFIG".to_string(), config_str);
741 }
742 env.insert("PORT".to_string(), "3000".to_string());
743
744 if let Some(ref spec_url) = updated_deployment.openapi_spec_url {
745 env.insert("MOCKFORGE_OPENAPI_SPEC_URL".to_string(), spec_url.clone());
746 }
747
748 let image = std::env::var("MOCKFORGE_DOCKER_IMAGE")
749 .unwrap_or_else(|_| "ghcr.io/saasy-solutions/mockforge:latest".to_string());
750
751 use crate::deployment::flyio::{
752 FlyioCheck, FlyioMachineConfig, FlyioPort, FlyioRegistryAuth, FlyioService,
753 };
754
755 let services = vec![FlyioService {
756 protocol: "tcp".to_string(),
757 internal_port: 3000,
758 ports: vec![
759 FlyioPort {
760 port: 80,
761 handlers: vec!["http".to_string()],
762 },
763 FlyioPort {
764 port: 443,
765 handlers: vec!["tls".to_string(), "http".to_string()],
766 },
767 ],
768 }];
769
770 let mut checks = std::collections::HashMap::new();
771 checks.insert(
772 "alive".to_string(),
773 FlyioCheck {
774 check_type: "http".to_string(),
775 port: 3000,
776 grace_period: "10s".to_string(),
777 interval: "15s".to_string(),
778 method: "GET".to_string(),
779 timeout: "2s".to_string(),
780 tls_skip_verify: false,
781 path: Some("/health/live".to_string()),
782 },
783 );
784
785 let machine_config = FlyioMachineConfig {
786 image,
787 env,
788 services,
789 checks: Some(checks),
790 guest: None,
795 };
796
797 let registry_auth = if let (Ok(server), Ok(username), Ok(password)) = (
799 std::env::var("DOCKER_REGISTRY_SERVER"),
800 std::env::var("DOCKER_REGISTRY_USERNAME"),
801 std::env::var("DOCKER_REGISTRY_PASSWORD"),
802 ) {
803 Some(FlyioRegistryAuth {
804 server,
805 username,
806 password,
807 })
808 } else if machine_config.image.starts_with("registry.fly.io/") {
809 Some(FlyioRegistryAuth {
810 server: "registry.fly.io".to_string(),
811 username: "x".to_string(),
812 password: flyio_client.api_token().to_string(),
813 })
814 } else {
815 None
816 };
817
818 match flyio_client
819 .update_machine(&app_name, machine_id, machine_config, registry_auth)
820 .await
821 {
822 Ok(_) => {
823 let _ = DeploymentLog::create(
824 pool,
825 deployment_id_clone,
826 "info",
827 "Machine updated and restarting",
828 None,
829 )
830 .await;
831 }
832 Err(e) => {
833 tracing::error!("Redeployment failed for {}: {:#}", deployment_id_clone, e);
834 let _ = HostedMock::update_status(
835 pool,
836 deployment_id_clone,
837 DeploymentStatus::Failed,
838 Some(&format!("Redeployment failed: {}", e)),
839 )
840 .await;
841 let _ = DeploymentLog::create(
842 pool,
843 deployment_id_clone,
844 "error",
845 &format!("Redeployment failed: {}", e),
846 None,
847 )
848 .await;
849 return;
850 }
851 }
852 } else {
853 tracing::error!(
854 "No Fly.io machine ID found for deployment {}",
855 deployment_id_clone
856 );
857 let _ = HostedMock::update_status(
858 pool,
859 deployment_id_clone,
860 DeploymentStatus::Failed,
861 Some("No Fly.io machine ID found in deployment metadata"),
862 )
863 .await;
864 return;
865 }
866 }
867
868 let _ =
870 HostedMock::update_status(pool, deployment_id_clone, DeploymentStatus::Active, None)
871 .await;
872
873 let _ = DeploymentLog::create(
874 pool,
875 deployment_id_clone,
876 "info",
877 "Redeployment completed successfully",
878 None,
879 )
880 .await;
881
882 tracing::info!("Successfully redeployed mock service: {}", deployment_id_clone);
883 });
884
885 Ok(Json(serde_json::json!({
886 "id": deployment_id,
887 "status": "deploying",
888 "message": "Redeployment initiated"
889 })))
890}
891
892fn deployment_app_name(deployment: &HostedMock) -> String {
893 format!(
894 "mockforge-{}-{}",
895 deployment
896 .org_id
897 .to_string()
898 .replace('-', "")
899 .chars()
900 .take(8)
901 .collect::<String>(),
902 deployment.slug
903 )
904}
905
906pub async fn stop_deployment(
911 State(state): State<AppState>,
912 AuthUser(user_id): AuthUser,
913 headers: HeaderMap,
914 Path(deployment_id): Path<Uuid>,
915) -> ApiResult<Json<DeploymentResponse>> {
916 let pool = state.db.pool();
917
918 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
919 .await
920 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
921
922 let checker = PermissionChecker::new(&state);
923 checker
924 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
925 .await?;
926
927 let deployment = state
928 .store
929 .find_hosted_mock_by_id(deployment_id)
930 .await?
931 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
932
933 if deployment.org_id != org_ctx.org_id {
934 return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
935 }
936
937 let status = deployment.status();
938 if !matches!(status, DeploymentStatus::Active) {
939 return Err(ApiError::InvalidRequest(format!(
940 "Cannot stop a deployment with status '{}'. Must be 'active'.",
941 status
942 )));
943 }
944
945 if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
946 let flyio_client = FlyioClient::new(flyio_token);
947 let app_name = deployment_app_name(&deployment);
948 let machine_id = deployment.metadata_json.get("flyio_machine_id").and_then(|v| v.as_str());
949
950 if let Some(machine_id) = machine_id {
951 flyio_client.stop_machine(&app_name, machine_id).await.map_err(|e| {
952 ApiError::Internal(anyhow::anyhow!("Failed to stop machine: {}", e))
953 })?;
954 } else {
955 warn!(
956 "No Fly.io machine ID found for deployment {}; marking as stopped anyway",
957 deployment_id
958 );
959 }
960 }
961
962 state
963 .store
964 .update_hosted_mock_status(deployment_id, DeploymentStatus::Stopped, None)
965 .await?;
966
967 DeploymentLog::create(pool, deployment_id, "info", "Deployment stopped", None)
968 .await
969 .ok();
970
971 let updated = state.store.find_hosted_mock_by_id(deployment_id).await?.ok_or_else(|| {
972 ApiError::Internal(anyhow::anyhow!("Failed to retrieve updated deployment"))
973 })?;
974
975 Ok(Json(DeploymentResponse::from(updated)))
976}
977
978pub async fn start_deployment(
980 State(state): State<AppState>,
981 AuthUser(user_id): AuthUser,
982 headers: HeaderMap,
983 Path(deployment_id): Path<Uuid>,
984) -> ApiResult<Json<DeploymentResponse>> {
985 let pool = state.db.pool();
986
987 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
988 .await
989 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
990
991 let checker = PermissionChecker::new(&state);
992 checker
993 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
994 .await?;
995
996 let deployment = state
997 .store
998 .find_hosted_mock_by_id(deployment_id)
999 .await?
1000 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1001
1002 if deployment.org_id != org_ctx.org_id {
1003 return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
1004 }
1005
1006 let status = deployment.status();
1007 if !matches!(status, DeploymentStatus::Stopped) {
1008 return Err(ApiError::InvalidRequest(format!(
1009 "Cannot start a deployment with status '{}'. Must be 'stopped'.",
1010 status
1011 )));
1012 }
1013
1014 if let Ok(flyio_token) = std::env::var("FLYIO_API_TOKEN") {
1015 let flyio_client = FlyioClient::new(flyio_token);
1016 let app_name = deployment_app_name(&deployment);
1017 let machine_id = deployment.metadata_json.get("flyio_machine_id").and_then(|v| v.as_str());
1018
1019 if let Some(machine_id) = machine_id {
1020 flyio_client.start_machine(&app_name, machine_id).await.map_err(|e| {
1021 ApiError::Internal(anyhow::anyhow!("Failed to start machine: {}", e))
1022 })?;
1023 } else {
1024 return Err(ApiError::InvalidRequest(
1025 "No Fly.io machine ID found in deployment metadata; cannot start".to_string(),
1026 ));
1027 }
1028 }
1029
1030 state
1031 .store
1032 .update_hosted_mock_status(deployment_id, DeploymentStatus::Active, None)
1033 .await?;
1034
1035 DeploymentLog::create(pool, deployment_id, "info", "Deployment started", None)
1036 .await
1037 .ok();
1038
1039 let updated = state.store.find_hosted_mock_by_id(deployment_id).await?.ok_or_else(|| {
1040 ApiError::Internal(anyhow::anyhow!("Failed to retrieve updated deployment"))
1041 })?;
1042
1043 Ok(Json(DeploymentResponse::from(updated)))
1044}
1045
1046pub async fn get_deployment_logs(
1048 State(state): State<AppState>,
1049 AuthUser(user_id): AuthUser,
1050 headers: HeaderMap,
1051 Path(deployment_id): Path<Uuid>,
1052) -> ApiResult<Json<Vec<LogResponse>>> {
1053 let pool = state.db.pool();
1054
1055 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1057 .await
1058 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1059
1060 let deployment = HostedMock::find_by_id(pool, deployment_id)
1062 .await
1063 .map_err(ApiError::Database)?
1064 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1065
1066 if deployment.org_id != org_ctx.org_id {
1068 return Err(ApiError::InvalidRequest(
1069 "You don't have access to this deployment".to_string(),
1070 ));
1071 }
1072
1073 let logs = DeploymentLog::find_by_mock(pool, deployment_id, Some(100))
1075 .await
1076 .map_err(ApiError::Database)?;
1077
1078 let responses: Vec<LogResponse> = logs.into_iter().map(LogResponse::from).collect();
1079
1080 Ok(Json(responses))
1081}
1082
1083#[derive(Debug, Deserialize)]
1085pub struct RuntimeLogsQuery {
1086 pub limit: Option<u32>,
1088 pub since: Option<String>,
1090}
1091
1092pub async fn get_runtime_logs(
1100 State(state): State<AppState>,
1101 AuthUser(user_id): AuthUser,
1102 headers: HeaderMap,
1103 Path(deployment_id): Path<Uuid>,
1104 Query(params): Query<RuntimeLogsQuery>,
1105) -> ApiResult<Json<Vec<LogEntry>>> {
1106 let pool = state.db.pool();
1107
1108 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1109 .await
1110 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1111
1112 let deployment = HostedMock::find_by_id(pool, deployment_id)
1113 .await
1114 .map_err(ApiError::Database)?
1115 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1116
1117 if deployment.org_id != org_ctx.org_id {
1118 return Err(ApiError::InvalidRequest(
1119 "You don't have access to this deployment".to_string(),
1120 ));
1121 }
1122
1123 let Some(client) = crate::fly_logs::global() else {
1124 return Ok(Json(Vec::new()));
1129 };
1130
1131 let since = params
1132 .since
1133 .as_deref()
1134 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
1135 .map(|d| d.with_timezone(&Utc));
1136
1137 let app_name = deployment.fly_app_name();
1138 let entries = client.fetch_recent(&app_name, since, params.limit).await.map_err(|e| {
1139 warn!(error = %e, app_name = %app_name, "Fly runtime logs fetch failed");
1140 ApiError::Internal(anyhow::anyhow!("Fly logs query failed: {}", e))
1141 })?;
1142
1143 Ok(Json(entries))
1144}
1145
1146pub async fn stream_runtime_logs(
1150 State(state): State<AppState>,
1151 AuthUser(user_id): AuthUser,
1152 headers: HeaderMap,
1153 Path(deployment_id): Path<Uuid>,
1154) -> ApiResult<Sse<impl Stream<Item = Result<Event, Infallible>>>> {
1155 let pool = state.db.pool();
1156 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1157 .await
1158 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1159
1160 let deployment = HostedMock::find_by_id(pool, deployment_id)
1161 .await
1162 .map_err(ApiError::Database)?
1163 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1164
1165 if deployment.org_id != org_ctx.org_id {
1166 return Err(ApiError::InvalidRequest(
1167 "You don't have access to this deployment".to_string(),
1168 ));
1169 }
1170
1171 let app_name = deployment.fly_app_name();
1172 let stream = build_runtime_logs_stream(app_name);
1173
1174 Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
1175}
1176
1177fn build_runtime_logs_stream(app_name: String) -> impl Stream<Item = Result<Event, Infallible>> {
1181 use futures_util::stream::unfold;
1182
1183 struct State {
1187 cursor: DateTime<Utc>,
1188 client: Option<&'static crate::fly_logs::FlyLogsClient>,
1189 app_name: String,
1190 emitted_unconfigured: bool,
1191 }
1192
1193 let state = State {
1194 cursor: Utc::now() - chrono::Duration::seconds(30),
1195 client: crate::fly_logs::global(),
1196 app_name,
1197 emitted_unconfigured: false,
1198 };
1199
1200 unfold(state, |mut st| async move {
1201 if st.client.is_none() && !st.emitted_unconfigured {
1205 st.emitted_unconfigured = true;
1206 let event = Event::default()
1207 .event("config")
1208 .data("Fly runtime logs are not configured (FLYIO_API_TOKEN unset)");
1209 return Some((Ok(event), st));
1210 }
1211
1212 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1213
1214 let Some(client) = st.client else {
1215 let event = Event::default().comment("idle");
1218 return Some((Ok(event), st));
1219 };
1220
1221 match client.fetch_recent(&st.app_name, Some(st.cursor), None).await {
1222 Ok(entries) if entries.is_empty() => {
1223 let event = Event::default().comment("no-new-logs");
1224 Some((Ok(event), st))
1225 }
1226 Ok(entries) => {
1227 if let Some(latest) = entries.iter().map(|e| e.timestamp).max() {
1229 st.cursor = latest;
1230 }
1231 let payload = serde_json::to_string(&entries).unwrap_or_else(|_| "[]".to_string());
1232 let event = Event::default().event("logs").data(payload);
1233 Some((Ok(event), st))
1234 }
1235 Err(err) => {
1236 let payload = serde_json::json!({ "error": err.to_string() }).to_string();
1237 let event = Event::default().event("error").data(payload);
1238 Some((Ok(event), st))
1239 }
1240 }
1241 })
1242}
1243
1244#[derive(Debug, Deserialize, Serialize)]
1248pub struct RuntimeRequestEvent {
1249 pub timestamp: DateTime<Utc>,
1250 pub method: String,
1251 pub path: String,
1252 pub status: u16,
1253 pub latency_ms: u32,
1254 #[serde(default)]
1255 pub matched_route: Option<String>,
1256 #[serde(default)]
1257 pub client_ip: Option<String>,
1258 #[serde(default)]
1259 pub user_agent: Option<String>,
1260 #[serde(default)]
1261 pub request_id: Option<String>,
1262 #[serde(default)]
1263 pub bytes_in: Option<i64>,
1264 #[serde(default)]
1265 pub bytes_out: Option<i64>,
1266}
1267
1268#[derive(Debug, Deserialize)]
1269pub struct IngestPayload {
1270 pub events: Vec<RuntimeRequestEvent>,
1271}
1272
1273#[derive(Debug, Serialize)]
1274pub struct IngestResponse {
1275 pub accepted: usize,
1276}
1277
1278pub async fn ingest_runtime_logs(
1285 State(state): State<AppState>,
1286 Path(deployment_id): Path<Uuid>,
1287 headers: HeaderMap,
1288 Json(payload): Json<IngestPayload>,
1289) -> ApiResult<Json<IngestResponse>> {
1290 let auth = headers
1292 .get("Authorization")
1293 .and_then(|h| h.to_str().ok())
1294 .and_then(|h| h.strip_prefix("Bearer "))
1295 .ok_or_else(|| ApiError::InvalidRequest("Missing deployment ingest token".to_string()))?;
1296
1297 let token_deployment_id = mockforge_registry_core::auth::verify_deployment_ingest_token(
1298 auth,
1299 &state.config.jwt_secret,
1300 )
1301 .map_err(|e| {
1302 tracing::warn!(error = %e, "Deployment ingest token rejected");
1303 ApiError::InvalidRequest("Invalid deployment ingest token".to_string())
1304 })?;
1305
1306 if token_deployment_id != deployment_id {
1307 return Err(ApiError::InvalidRequest(
1308 "Token deployment id does not match URL path".to_string(),
1309 ));
1310 }
1311
1312 if payload.events.is_empty() {
1313 return Ok(Json(IngestResponse { accepted: 0 }));
1314 }
1315
1316 const MAX_BATCH: usize = 500;
1320 let events: Vec<RuntimeRequestEvent> = payload.events.into_iter().take(MAX_BATCH).collect();
1321 let accepted = events.len();
1322
1323 let pool = state.db.pool();
1327 let mut tx = pool.begin().await.map_err(ApiError::Database)?;
1328 for evt in &events {
1329 sqlx::query(
1330 r#"
1331 INSERT INTO runtime_request_logs (
1332 deployment_id, occurred_at, method, path, status, latency_ms,
1333 matched_route, client_ip, user_agent, request_id, bytes_in, bytes_out
1334 )
1335 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
1336 "#,
1337 )
1338 .bind(deployment_id)
1339 .bind(evt.timestamp)
1340 .bind(&evt.method)
1341 .bind(&evt.path)
1342 .bind(evt.status as i16)
1343 .bind(evt.latency_ms as i32)
1344 .bind(evt.matched_route.as_ref())
1345 .bind(evt.client_ip.as_ref())
1346 .bind(evt.user_agent.as_ref())
1347 .bind(evt.request_id.as_ref())
1348 .bind(evt.bytes_in)
1349 .bind(evt.bytes_out)
1350 .execute(&mut *tx)
1351 .await
1352 .map_err(ApiError::Database)?;
1353 }
1354 tx.commit().await.map_err(ApiError::Database)?;
1355
1356 Ok(Json(IngestResponse { accepted }))
1357}
1358
1359#[derive(Debug, Deserialize)]
1369pub struct CaptureIngestRequest {
1370 pub id: String,
1371 pub protocol: String,
1372 pub timestamp: DateTime<Utc>,
1373 pub method: String,
1374 pub path: String,
1375 #[serde(default)]
1376 pub query_params: Option<String>,
1377 pub headers: String,
1378 #[serde(default)]
1379 pub body: Option<String>,
1380 pub body_encoding: String,
1381 #[serde(default)]
1382 pub client_ip: Option<String>,
1383 #[serde(default)]
1384 pub trace_id: Option<String>,
1385 #[serde(default)]
1386 pub span_id: Option<String>,
1387 #[serde(default)]
1388 pub duration_ms: Option<i64>,
1389 #[serde(default)]
1390 pub status_code: Option<i32>,
1391 #[serde(default)]
1392 pub tags: Option<String>,
1393}
1394
1395#[derive(Debug, Deserialize)]
1396pub struct CaptureIngestResponse {
1397 pub status_code: i32,
1398 pub headers: String,
1399 #[serde(default)]
1400 pub body: Option<String>,
1401 pub body_encoding: String,
1402 pub size_bytes: i64,
1403 pub timestamp: DateTime<Utc>,
1404}
1405
1406#[derive(Debug, Deserialize)]
1407pub struct CaptureIngestExchange {
1408 pub request: CaptureIngestRequest,
1409 #[serde(default)]
1410 pub response: Option<CaptureIngestResponse>,
1411}
1412
1413#[derive(Debug, Deserialize)]
1414pub struct CaptureIngestPayload {
1415 pub exchanges: Vec<CaptureIngestExchange>,
1416}
1417
1418pub async fn ingest_runtime_captures(
1424 State(state): State<AppState>,
1425 Path(deployment_id): Path<Uuid>,
1426 headers: HeaderMap,
1427 Json(payload): Json<CaptureIngestPayload>,
1428) -> ApiResult<Json<IngestResponse>> {
1429 let auth = headers
1430 .get("Authorization")
1431 .and_then(|h| h.to_str().ok())
1432 .and_then(|h| h.strip_prefix("Bearer "))
1433 .ok_or_else(|| ApiError::InvalidRequest("Missing deployment ingest token".to_string()))?;
1434
1435 let token_deployment_id = mockforge_registry_core::auth::verify_deployment_ingest_token(
1436 auth,
1437 &state.config.jwt_secret,
1438 )
1439 .map_err(|e| {
1440 tracing::warn!(error = %e, "Capture ingest token rejected");
1441 ApiError::InvalidRequest("Invalid deployment ingest token".to_string())
1442 })?;
1443
1444 if token_deployment_id != deployment_id {
1445 return Err(ApiError::InvalidRequest(
1446 "Token deployment id does not match URL path".to_string(),
1447 ));
1448 }
1449
1450 if payload.exchanges.is_empty() {
1451 return Ok(Json(IngestResponse { accepted: 0 }));
1452 }
1453
1454 const MAX_BATCH: usize = 100;
1458 let exchanges: Vec<CaptureIngestExchange> =
1459 payload.exchanges.into_iter().take(MAX_BATCH).collect();
1460 let accepted = exchanges.len();
1461
1462 let pool = state.db.pool();
1463 let mut tx = pool.begin().await.map_err(ApiError::Database)?;
1464 for exchange in &exchanges {
1465 let req = &exchange.request;
1466 let resp = exchange.response.as_ref();
1467 sqlx::query(
1468 r#"
1469 INSERT INTO runtime_captures (
1470 deployment_id, capture_id, protocol, occurred_at, method, path,
1471 query_params, request_headers, request_body, request_body_encoding,
1472 client_ip, trace_id, span_id, duration_ms, status_code, tags,
1473 response_status_code, response_headers, response_body,
1474 response_body_encoding, response_size_bytes, response_timestamp
1475 )
1476 VALUES (
1477 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14,
1478 $15, $16, $17, $18, $19, $20, $21, $22
1479 )
1480 ON CONFLICT (deployment_id, capture_id) DO NOTHING
1481 "#,
1482 )
1483 .bind(deployment_id)
1484 .bind(&req.id)
1485 .bind(&req.protocol)
1486 .bind(req.timestamp)
1487 .bind(&req.method)
1488 .bind(&req.path)
1489 .bind(req.query_params.as_ref())
1490 .bind(&req.headers)
1491 .bind(req.body.as_ref())
1492 .bind(&req.body_encoding)
1493 .bind(req.client_ip.as_ref())
1494 .bind(req.trace_id.as_ref())
1495 .bind(req.span_id.as_ref())
1496 .bind(req.duration_ms)
1497 .bind(req.status_code)
1498 .bind(req.tags.as_ref())
1499 .bind(resp.map(|r| r.status_code))
1500 .bind(resp.map(|r| r.headers.as_str()))
1501 .bind(resp.and_then(|r| r.body.as_deref()))
1502 .bind(resp.map(|r| r.body_encoding.as_str()))
1503 .bind(resp.map(|r| r.size_bytes))
1504 .bind(resp.map(|r| r.timestamp))
1505 .execute(&mut *tx)
1506 .await
1507 .map_err(ApiError::Database)?;
1508 }
1509 tx.commit().await.map_err(ApiError::Database)?;
1510
1511 Ok(Json(IngestResponse { accepted }))
1512}
1513
1514#[derive(Debug, Deserialize)]
1515pub struct RuntimeRequestsQuery {
1516 pub limit: Option<u32>,
1518 pub since: Option<String>,
1520}
1521
1522pub async fn get_runtime_requests(
1525 State(state): State<AppState>,
1526 AuthUser(user_id): AuthUser,
1527 headers: HeaderMap,
1528 Path(deployment_id): Path<Uuid>,
1529 Query(params): Query<RuntimeRequestsQuery>,
1530) -> ApiResult<Json<Vec<RuntimeRequestEvent>>> {
1531 let pool = state.db.pool();
1532
1533 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1534 .await
1535 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1536
1537 let deployment = HostedMock::find_by_id(pool, deployment_id)
1538 .await
1539 .map_err(ApiError::Database)?
1540 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1541
1542 if deployment.org_id != org_ctx.org_id {
1543 return Err(ApiError::InvalidRequest(
1544 "You don't have access to this deployment".to_string(),
1545 ));
1546 }
1547
1548 let limit = params.limit.unwrap_or(100).min(500) as i64;
1549 let since = params
1550 .since
1551 .as_deref()
1552 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
1553 .map(|d| d.with_timezone(&Utc));
1554
1555 type RuntimeRequestRow = (
1556 DateTime<Utc>,
1557 String,
1558 String,
1559 i16,
1560 i32,
1561 Option<String>,
1562 Option<String>,
1563 Option<String>,
1564 Option<String>,
1565 Option<i64>,
1566 Option<i64>,
1567 );
1568
1569 let rows: Vec<RuntimeRequestRow> = if let Some(since) = since {
1570 sqlx::query_as(
1571 r#"
1572 SELECT occurred_at, method, path, status, latency_ms,
1573 matched_route, client_ip, user_agent, request_id,
1574 bytes_in, bytes_out
1575 FROM runtime_request_logs
1576 WHERE deployment_id = $1 AND occurred_at > $2
1577 ORDER BY occurred_at DESC
1578 LIMIT $3
1579 "#,
1580 )
1581 .bind(deployment_id)
1582 .bind(since)
1583 .bind(limit)
1584 .fetch_all(pool)
1585 .await
1586 .map_err(ApiError::Database)?
1587 } else {
1588 sqlx::query_as(
1589 r#"
1590 SELECT occurred_at, method, path, status, latency_ms,
1591 matched_route, client_ip, user_agent, request_id,
1592 bytes_in, bytes_out
1593 FROM runtime_request_logs
1594 WHERE deployment_id = $1
1595 ORDER BY occurred_at DESC
1596 LIMIT $2
1597 "#,
1598 )
1599 .bind(deployment_id)
1600 .bind(limit)
1601 .fetch_all(pool)
1602 .await
1603 .map_err(ApiError::Database)?
1604 };
1605
1606 let events: Vec<RuntimeRequestEvent> = rows
1607 .into_iter()
1608 .map(|row| RuntimeRequestEvent {
1609 timestamp: row.0,
1610 method: row.1,
1611 path: row.2,
1612 status: row.3 as u16,
1613 latency_ms: row.4 as u32,
1614 matched_route: row.5,
1615 client_ip: row.6,
1616 user_agent: row.7,
1617 request_id: row.8,
1618 bytes_in: row.9,
1619 bytes_out: row.10,
1620 })
1621 .collect();
1622
1623 Ok(Json(events))
1624}
1625
1626async fn proxy_to_deployment_recorder(
1639 deployment: &HostedMock,
1640 path_and_query: &str,
1641) -> ApiResult<axum::http::Response<axum::body::Body>> {
1642 let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
1643 let Some(base) = base else {
1644 return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
1645 };
1646 let url = format!("{}{}", base.trim_end_matches('/'), path_and_query);
1647
1648 let client = reqwest::Client::builder()
1649 .timeout(std::time::Duration::from_secs(10))
1650 .build()
1651 .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
1652
1653 let resp =
1654 client.get(&url).send().await.map_err(|e| {
1655 ApiError::Internal(anyhow::anyhow!("Recorder proxy fetch failed: {}", e))
1656 })?;
1657
1658 let status = resp.status();
1659 let headers = resp.headers().clone();
1660 let body = resp.bytes().await.map_err(|e| {
1661 ApiError::Internal(anyhow::anyhow!("Recorder proxy read body failed: {}", e))
1662 })?;
1663
1664 let mut builder = axum::http::Response::builder().status(status);
1665 if let Some(content_type) = headers.get(axum::http::header::CONTENT_TYPE) {
1666 builder = builder.header(axum::http::header::CONTENT_TYPE, content_type);
1667 }
1668 builder.body(axum::body::Body::from(body)).map_err(|e| {
1669 ApiError::Internal(anyhow::anyhow!("Recorder proxy response build failed: {}", e))
1670 })
1671}
1672
1673async fn proxy_to_deployment_state_machines(
1678 deployment: &HostedMock,
1679 path_and_query: &str,
1680) -> ApiResult<axum::http::Response<axum::body::Body>> {
1681 let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
1682 let Some(base) = base else {
1683 return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
1684 };
1685 let url = format!("{}{}", base.trim_end_matches('/'), path_and_query);
1686
1687 let client = reqwest::Client::builder()
1688 .timeout(std::time::Duration::from_secs(10))
1689 .build()
1690 .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
1691
1692 let resp = client.get(&url).send().await.map_err(|e| {
1693 ApiError::Internal(anyhow::anyhow!("State-machine proxy fetch failed: {}", e))
1694 })?;
1695 let status = resp.status();
1696 let headers = resp.headers().clone();
1697 let body = resp.bytes().await.map_err(|e| {
1698 ApiError::Internal(anyhow::anyhow!("State-machine proxy read body failed: {}", e))
1699 })?;
1700
1701 let mut builder = axum::http::Response::builder().status(status);
1702 if let Some(ct) = headers.get(axum::http::header::CONTENT_TYPE) {
1703 builder = builder.header(axum::http::header::CONTENT_TYPE, ct);
1704 }
1705 builder.body(axum::body::Body::from(body)).map_err(|e| {
1706 ApiError::Internal(anyhow::anyhow!("State-machine proxy response build failed: {}", e))
1707 })
1708}
1709
1710pub async fn list_deployment_state_machines(
1713 State(state): State<AppState>,
1714 AuthUser(user_id): AuthUser,
1715 headers: HeaderMap,
1716 Path(deployment_id): Path<Uuid>,
1717) -> ApiResult<axum::http::Response<axum::body::Body>> {
1718 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
1719 proxy_to_deployment_state_machines(&deployment, "/__mockforge/api/state-machines").await
1720}
1721
1722pub async fn get_deployment_state_machine(
1725 State(state): State<AppState>,
1726 AuthUser(user_id): AuthUser,
1727 headers: HeaderMap,
1728 Path((deployment_id, resource_type)): Path<(Uuid, String)>,
1729) -> ApiResult<axum::http::Response<axum::body::Body>> {
1730 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
1731 if resource_type.contains('/') || resource_type.contains('?') || resource_type.contains('#') {
1732 return Err(ApiError::InvalidRequest("Invalid resource type".to_string()));
1733 }
1734 let path = format!("/__mockforge/api/state-machines/{}", urlencoding::encode(&resource_type));
1735 proxy_to_deployment_state_machines(&deployment, &path).await
1736}
1737
1738pub async fn list_deployment_state_machine_instances(
1741 State(state): State<AppState>,
1742 AuthUser(user_id): AuthUser,
1743 headers: HeaderMap,
1744 Path(deployment_id): Path<Uuid>,
1745) -> ApiResult<axum::http::Response<axum::body::Body>> {
1746 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
1747 proxy_to_deployment_state_machines(&deployment, "/__mockforge/api/state-machines/instances")
1748 .await
1749}
1750
1751#[derive(Debug, Deserialize)]
1752pub struct RecorderCapturesQuery {
1753 pub limit: Option<u32>,
1754 pub since: Option<String>,
1755}
1756
1757pub async fn list_recorder_captures(
1766 State(state): State<AppState>,
1767 AuthUser(user_id): AuthUser,
1768 headers: HeaderMap,
1769 Path(deployment_id): Path<Uuid>,
1770 Query(params): Query<RecorderCapturesQuery>,
1771) -> ApiResult<axum::http::Response<axum::body::Body>> {
1772 let pool = state.db.pool();
1773
1774 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1775 .await
1776 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1777
1778 let deployment = HostedMock::find_by_id(pool, deployment_id)
1779 .await
1780 .map_err(ApiError::Database)?
1781 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1782
1783 if deployment.org_id != org_ctx.org_id {
1784 return Err(ApiError::InvalidRequest(
1785 "You don't have access to this deployment".to_string(),
1786 ));
1787 }
1788
1789 let has_synced: bool = sqlx::query_scalar(
1793 "SELECT EXISTS(SELECT 1 FROM runtime_captures WHERE deployment_id = $1 LIMIT 1)",
1794 )
1795 .bind(deployment_id)
1796 .fetch_one(pool)
1797 .await
1798 .map_err(ApiError::Database)?;
1799
1800 if has_synced {
1801 let limit = params.limit.unwrap_or(100).min(500) as i64;
1802 let since = params
1803 .since
1804 .as_deref()
1805 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
1806 .map(|d| d.with_timezone(&Utc));
1807 let captures = list_cloud_captures(pool, deployment_id, limit, since).await?;
1808 let body = serde_json::to_vec(&captures).map_err(|e| {
1809 ApiError::InvalidRequest(format!("Failed to serialize captures: {}", e))
1810 })?;
1811 return Ok(axum::http::Response::builder()
1812 .status(axum::http::StatusCode::OK)
1813 .header("content-type", "application/json")
1814 .body(axum::body::Body::from(body))
1815 .unwrap());
1816 }
1817
1818 let mut qs = String::from("/api/recorder/requests");
1819 let mut sep = '?';
1820 if let Some(limit) = params.limit {
1821 qs.push(sep);
1822 qs.push_str(&format!("limit={}", limit));
1823 sep = '&';
1824 }
1825 if let Some(since) = params.since.as_deref() {
1826 qs.push(sep);
1827 qs.push_str(&format!("since={}", urlencoding::encode(since)));
1828 }
1829
1830 let _ = state;
1831 proxy_to_deployment_recorder(&deployment, &qs).await
1832}
1833
1834async fn list_cloud_captures(
1837 pool: &sqlx::PgPool,
1838 deployment_id: Uuid,
1839 limit: i64,
1840 since: Option<DateTime<Utc>>,
1841) -> ApiResult<Vec<serde_json::Value>> {
1842 type Row = (
1843 String, String, DateTime<Utc>, String, String, Option<String>, String, Option<String>, String, Option<String>, Option<String>, Option<String>, Option<i64>, Option<i32>, Option<String>, );
1859
1860 let rows: Vec<Row> = if let Some(since) = since {
1861 sqlx::query_as(
1862 r#"
1863 SELECT capture_id, protocol, occurred_at, method, path, query_params,
1864 request_headers, request_body, request_body_encoding,
1865 client_ip, trace_id, span_id, duration_ms, status_code, tags
1866 FROM runtime_captures
1867 WHERE deployment_id = $1 AND occurred_at > $2
1868 ORDER BY occurred_at DESC
1869 LIMIT $3
1870 "#,
1871 )
1872 .bind(deployment_id)
1873 .bind(since)
1874 .bind(limit)
1875 .fetch_all(pool)
1876 .await
1877 .map_err(ApiError::Database)?
1878 } else {
1879 sqlx::query_as(
1880 r#"
1881 SELECT capture_id, protocol, occurred_at, method, path, query_params,
1882 request_headers, request_body, request_body_encoding,
1883 client_ip, trace_id, span_id, duration_ms, status_code, tags
1884 FROM runtime_captures
1885 WHERE deployment_id = $1
1886 ORDER BY occurred_at DESC
1887 LIMIT $2
1888 "#,
1889 )
1890 .bind(deployment_id)
1891 .bind(limit)
1892 .fetch_all(pool)
1893 .await
1894 .map_err(ApiError::Database)?
1895 };
1896
1897 let captures = rows
1898 .into_iter()
1899 .map(|r| {
1900 serde_json::json!({
1901 "id": r.0,
1902 "protocol": r.1,
1903 "timestamp": r.2,
1904 "method": r.3,
1905 "path": r.4,
1906 "query_params": r.5,
1907 "headers": r.6,
1908 "body": r.7,
1909 "body_encoding": r.8,
1910 "client_ip": r.9,
1911 "trace_id": r.10,
1912 "span_id": r.11,
1913 "duration_ms": r.12,
1914 "status_code": r.13,
1915 "tags": r.14,
1916 })
1917 })
1918 .collect();
1919 Ok(captures)
1920}
1921
1922pub async fn get_recorder_capture(
1925 State(state): State<AppState>,
1926 AuthUser(user_id): AuthUser,
1927 headers: HeaderMap,
1928 Path((deployment_id, capture_id)): Path<(Uuid, String)>,
1929) -> ApiResult<axum::http::Response<axum::body::Body>> {
1930 let pool = state.db.pool();
1931
1932 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
1933 .await
1934 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
1935
1936 let deployment = HostedMock::find_by_id(pool, deployment_id)
1937 .await
1938 .map_err(ApiError::Database)?
1939 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
1940
1941 if deployment.org_id != org_ctx.org_id {
1942 return Err(ApiError::InvalidRequest(
1943 "You don't have access to this deployment".to_string(),
1944 ));
1945 }
1946
1947 if capture_id.contains('/') || capture_id.contains('?') || capture_id.contains('#') {
1950 return Err(ApiError::InvalidRequest("Invalid capture id".to_string()));
1951 }
1952
1953 if let Some(row) = fetch_cloud_capture(pool, deployment_id, &capture_id).await? {
1954 let body = serde_json::to_vec(&row)
1955 .map_err(|e| ApiError::InvalidRequest(format!("Failed to serialize capture: {}", e)))?;
1956 return Ok(axum::http::Response::builder()
1957 .status(axum::http::StatusCode::OK)
1958 .header("content-type", "application/json")
1959 .body(axum::body::Body::from(body))
1960 .unwrap());
1961 }
1962
1963 let path = format!("/api/recorder/requests/{}", urlencoding::encode(&capture_id));
1964 let _ = state;
1965 proxy_to_deployment_recorder(&deployment, &path).await
1966}
1967
1968async fn fetch_cloud_capture(
1971 pool: &sqlx::PgPool,
1972 deployment_id: Uuid,
1973 capture_id: &str,
1974) -> ApiResult<Option<serde_json::Value>> {
1975 type Row = (
1976 String,
1977 String,
1978 DateTime<Utc>,
1979 String,
1980 String,
1981 Option<String>,
1982 String,
1983 Option<String>,
1984 String,
1985 Option<String>,
1986 Option<String>,
1987 Option<String>,
1988 Option<i64>,
1989 Option<i32>,
1990 Option<String>,
1991 );
1992 let row: Option<Row> = sqlx::query_as(
1993 r#"
1994 SELECT capture_id, protocol, occurred_at, method, path, query_params,
1995 request_headers, request_body, request_body_encoding,
1996 client_ip, trace_id, span_id, duration_ms, status_code, tags
1997 FROM runtime_captures
1998 WHERE deployment_id = $1 AND capture_id = $2
1999 LIMIT 1
2000 "#,
2001 )
2002 .bind(deployment_id)
2003 .bind(capture_id)
2004 .fetch_optional(pool)
2005 .await
2006 .map_err(ApiError::Database)?;
2007 Ok(row.map(|r| {
2008 serde_json::json!({
2009 "id": r.0,
2010 "protocol": r.1,
2011 "timestamp": r.2,
2012 "method": r.3,
2013 "path": r.4,
2014 "query_params": r.5,
2015 "headers": r.6,
2016 "body": r.7,
2017 "body_encoding": r.8,
2018 "client_ip": r.9,
2019 "trace_id": r.10,
2020 "span_id": r.11,
2021 "duration_ms": r.12,
2022 "status_code": r.13,
2023 "tags": r.14,
2024 })
2025 }))
2026}
2027
2028pub async fn get_recorder_capture_response(
2032 State(state): State<AppState>,
2033 AuthUser(user_id): AuthUser,
2034 headers: HeaderMap,
2035 Path((deployment_id, capture_id)): Path<(Uuid, String)>,
2036) -> ApiResult<axum::http::Response<axum::body::Body>> {
2037 let pool = state.db.pool();
2038
2039 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2040 .await
2041 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2042
2043 let deployment = HostedMock::find_by_id(pool, deployment_id)
2044 .await
2045 .map_err(ApiError::Database)?
2046 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2047
2048 if deployment.org_id != org_ctx.org_id {
2049 return Err(ApiError::InvalidRequest(
2050 "You don't have access to this deployment".to_string(),
2051 ));
2052 }
2053
2054 if capture_id.contains('/') || capture_id.contains('?') || capture_id.contains('#') {
2055 return Err(ApiError::InvalidRequest("Invalid capture id".to_string()));
2056 }
2057
2058 if let Some(row) = fetch_cloud_capture_response(pool, deployment_id, &capture_id).await? {
2059 let body = serde_json::to_vec(&row).map_err(|e| {
2060 ApiError::InvalidRequest(format!("Failed to serialize response: {}", e))
2061 })?;
2062 return Ok(axum::http::Response::builder()
2063 .status(axum::http::StatusCode::OK)
2064 .header("content-type", "application/json")
2065 .body(axum::body::Body::from(body))
2066 .unwrap());
2067 }
2068
2069 let path = format!("/api/recorder/requests/{}/response", urlencoding::encode(&capture_id));
2070 let _ = state;
2071 proxy_to_deployment_recorder(&deployment, &path).await
2072}
2073
2074async fn fetch_cloud_capture_response(
2078 pool: &sqlx::PgPool,
2079 deployment_id: Uuid,
2080 capture_id: &str,
2081) -> ApiResult<Option<serde_json::Value>> {
2082 type Row = (Option<i32>, Option<String>, Option<String>, Option<String>, Option<i64>);
2083 let row: Option<Row> = sqlx::query_as(
2084 r#"
2085 SELECT response_status_code, response_headers, response_body,
2086 response_body_encoding, response_size_bytes
2087 FROM runtime_captures
2088 WHERE deployment_id = $1 AND capture_id = $2
2089 LIMIT 1
2090 "#,
2091 )
2092 .bind(deployment_id)
2093 .bind(capture_id)
2094 .fetch_optional(pool)
2095 .await
2096 .map_err(ApiError::Database)?;
2097 Ok(row.and_then(|r| {
2098 let status_code = r.0?;
2099 Some(serde_json::json!({
2100 "status_code": status_code,
2101 "headers": r.1.unwrap_or_else(|| "{}".to_string()),
2102 "body": r.2,
2103 "body_encoding": r.3.unwrap_or_else(|| "utf8".to_string()),
2104 "size_bytes": r.4.unwrap_or(0),
2105 }))
2106 }))
2107}
2108
2109async fn proxy_post_to_deployment_recorder(
2114 deployment: &HostedMock,
2115 path: &str,
2116) -> ApiResult<axum::http::Response<axum::body::Body>> {
2117 let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
2118 let Some(base) = base else {
2119 return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
2120 };
2121 let url = format!("{}{}", base.trim_end_matches('/'), path);
2122
2123 let client = reqwest::Client::builder()
2124 .timeout(std::time::Duration::from_secs(10))
2125 .build()
2126 .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
2127
2128 let resp =
2129 client.post(&url).send().await.map_err(|e| {
2130 ApiError::Internal(anyhow::anyhow!("Recorder proxy POST failed: {}", e))
2131 })?;
2132
2133 let status = resp.status();
2134 let headers = resp.headers().clone();
2135 let body = resp.bytes().await.map_err(|e| {
2136 ApiError::Internal(anyhow::anyhow!("Recorder proxy read body failed: {}", e))
2137 })?;
2138
2139 let mut builder = axum::http::Response::builder().status(status);
2140 if let Some(content_type) = headers.get(axum::http::header::CONTENT_TYPE) {
2141 builder = builder.header(axum::http::header::CONTENT_TYPE, content_type);
2142 }
2143 builder.body(axum::body::Body::from(body)).map_err(|e| {
2144 ApiError::Internal(anyhow::anyhow!("Recorder proxy response build failed: {}", e))
2145 })
2146}
2147
2148async fn check_org_access(
2149 state: &AppState,
2150 user_id: Uuid,
2151 headers: &HeaderMap,
2152 deployment_id: Uuid,
2153) -> ApiResult<HostedMock> {
2154 let pool = state.db.pool();
2155
2156 let org_ctx = resolve_org_context(state, user_id, headers, None)
2157 .await
2158 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2159
2160 let deployment = HostedMock::find_by_id(pool, deployment_id)
2161 .await
2162 .map_err(ApiError::Database)?
2163 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2164
2165 if deployment.org_id != org_ctx.org_id {
2166 return Err(ApiError::InvalidRequest(
2167 "You don't have access to this deployment".to_string(),
2168 ));
2169 }
2170 Ok(deployment)
2171}
2172
2173pub async fn get_recorder_status(
2177 State(state): State<AppState>,
2178 AuthUser(user_id): AuthUser,
2179 headers: HeaderMap,
2180 Path(deployment_id): Path<Uuid>,
2181) -> ApiResult<axum::http::Response<axum::body::Body>> {
2182 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2183 proxy_to_deployment_recorder(&deployment, "/api/recorder/status").await
2184}
2185
2186pub async fn enable_recorder(
2188 State(state): State<AppState>,
2189 AuthUser(user_id): AuthUser,
2190 headers: HeaderMap,
2191 Path(deployment_id): Path<Uuid>,
2192) -> ApiResult<axum::http::Response<axum::body::Body>> {
2193 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2194 proxy_post_to_deployment_recorder(&deployment, "/api/recorder/enable").await
2195}
2196
2197pub async fn disable_recorder(
2199 State(state): State<AppState>,
2200 AuthUser(user_id): AuthUser,
2201 headers: HeaderMap,
2202 Path(deployment_id): Path<Uuid>,
2203) -> ApiResult<axum::http::Response<axum::body::Body>> {
2204 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2205 proxy_post_to_deployment_recorder(&deployment, "/api/recorder/disable").await
2206}
2207
2208pub async fn clear_recorder(
2213 State(state): State<AppState>,
2214 AuthUser(user_id): AuthUser,
2215 headers: HeaderMap,
2216 Path(deployment_id): Path<Uuid>,
2217) -> ApiResult<axum::http::Response<axum::body::Body>> {
2218 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2219 let base = deployment.internal_url.as_deref().or(deployment.deployment_url.as_deref());
2221 let Some(base) = base else {
2222 return Err(ApiError::InvalidRequest("Deployment has no resolved URL yet".to_string()));
2223 };
2224 let url = format!("{}/api/recorder/clear", base.trim_end_matches('/'));
2225 let client = reqwest::Client::builder()
2226 .timeout(std::time::Duration::from_secs(10))
2227 .build()
2228 .map_err(|e| ApiError::Internal(anyhow::anyhow!("HTTP client init failed: {}", e)))?;
2229 let resp =
2230 client.delete(&url).send().await.map_err(|e| {
2231 ApiError::Internal(anyhow::anyhow!("Recorder clear proxy failed: {}", e))
2232 })?;
2233 let status = resp.status();
2234 let headers_resp = resp.headers().clone();
2235 let body = resp.bytes().await.map_err(|e| {
2236 ApiError::Internal(anyhow::anyhow!("Recorder clear read body failed: {}", e))
2237 })?;
2238 let mut builder = axum::http::Response::builder().status(status);
2239 if let Some(content_type) = headers_resp.get(axum::http::header::CONTENT_TYPE) {
2240 builder = builder.header(axum::http::header::CONTENT_TYPE, content_type);
2241 }
2242 builder.body(axum::body::Body::from(body)).map_err(|e| {
2243 ApiError::Internal(anyhow::anyhow!("Recorder clear response build failed: {}", e))
2244 })
2245}
2246
2247pub async fn replay_recorder_capture(
2252 State(state): State<AppState>,
2253 AuthUser(user_id): AuthUser,
2254 headers: HeaderMap,
2255 Path((deployment_id, capture_id)): Path<(Uuid, String)>,
2256) -> ApiResult<axum::http::Response<axum::body::Body>> {
2257 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2258 if capture_id.contains('/') || capture_id.contains('?') || capture_id.contains('#') {
2259 return Err(ApiError::InvalidRequest("Invalid capture id".to_string()));
2260 }
2261 let path = format!("/api/recorder/replay/{}", urlencoding::encode(&capture_id));
2262 proxy_post_to_deployment_recorder(&deployment, &path).await
2263}
2264
2265pub async fn export_recorder_captures_har(
2270 State(state): State<AppState>,
2271 AuthUser(user_id): AuthUser,
2272 headers: HeaderMap,
2273 Path(deployment_id): Path<Uuid>,
2274) -> ApiResult<axum::http::Response<axum::body::Body>> {
2275 let pool = state.db.pool();
2276
2277 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2278 .await
2279 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2280
2281 let deployment = HostedMock::find_by_id(pool, deployment_id)
2282 .await
2283 .map_err(ApiError::Database)?
2284 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2285
2286 if deployment.org_id != org_ctx.org_id {
2287 return Err(ApiError::InvalidRequest(
2288 "You don't have access to this deployment".to_string(),
2289 ));
2290 }
2291
2292 let _ = state;
2293 proxy_to_deployment_recorder(&deployment, "/api/recorder/export/har").await
2294}
2295
2296pub async fn export_recorder_captures_jsonl(
2302 State(state): State<AppState>,
2303 AuthUser(user_id): AuthUser,
2304 headers: HeaderMap,
2305 Path(deployment_id): Path<Uuid>,
2306) -> ApiResult<axum::http::Response<axum::body::Body>> {
2307 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2308 proxy_to_deployment_recorder(&deployment, "/api/recorder/export/jsonl").await
2309}
2310
2311pub async fn get_deployment_metrics(
2313 State(state): State<AppState>,
2314 AuthUser(user_id): AuthUser,
2315 headers: HeaderMap,
2316 Path(deployment_id): Path<Uuid>,
2317) -> ApiResult<Json<MetricsResponse>> {
2318 let pool = state.db.pool();
2319
2320 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2322 .await
2323 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2324
2325 let deployment = HostedMock::find_by_id(pool, deployment_id)
2327 .await
2328 .map_err(ApiError::Database)?
2329 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2330
2331 if deployment.org_id != org_ctx.org_id {
2333 return Err(ApiError::InvalidRequest(
2334 "You don't have access to this deployment".to_string(),
2335 ));
2336 }
2337
2338 if let Some(client) = crate::fly_metrics::global() {
2343 let app_name = deployment.fly_app_name();
2344 match client.snapshot_for_app(&app_name).await {
2345 Ok(snap) => {
2346 use chrono::Datelike;
2347 let now = Utc::now().date_naive();
2348 let period_start =
2349 chrono::NaiveDate::from_ymd_opt(now.year(), now.month(), 1).unwrap_or(now);
2350 return Ok(Json(MetricsResponse {
2351 requests: snap.requests,
2352 requests_2xx: snap.requests_2xx,
2353 requests_4xx: snap.requests_4xx,
2354 requests_5xx: snap.requests_5xx,
2355 egress_bytes: snap.egress_bytes,
2356 avg_response_time_ms: snap.avg_response_time_ms,
2357 period_start,
2358 }));
2359 }
2360 Err(err) => {
2361 tracing::warn!(
2362 app_name = %app_name,
2363 error = %err,
2364 "Fly Prometheus metrics query failed; falling back to local counters"
2365 );
2366 }
2367 }
2368 }
2369
2370 let metrics = DeploymentMetrics::get_or_create_current(pool, deployment_id)
2373 .await
2374 .map_err(ApiError::Database)?;
2375
2376 Ok(Json(MetricsResponse::from(metrics)))
2377}
2378
2379pub async fn upload_spec(
2381 State(state): State<AppState>,
2382 AuthUser(user_id): AuthUser,
2383 headers: HeaderMap,
2384 mut multipart: Multipart,
2385) -> ApiResult<Json<SpecUploadResponse>> {
2386 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2388 .await
2389 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2390
2391 let checker = PermissionChecker::new(&state);
2393 checker
2394 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
2395 .await?;
2396
2397 let mut file_data: Option<Vec<u8>> = None;
2399 let mut file_name = String::from("spec");
2400
2401 while let Some(field) = multipart
2402 .next_field()
2403 .await
2404 .map_err(|e| ApiError::InvalidRequest(format!("Failed to read multipart field: {}", e)))?
2405 {
2406 if field.name() == Some("file") || field.name() == Some("spec") {
2407 if let Some(name) = field.file_name() {
2408 file_name =
2409 name.to_string().replace(".yaml", "").replace(".yml", "").replace(".json", "");
2410 }
2411 let data = field.bytes().await.map_err(|e| {
2412 ApiError::InvalidRequest(format!("Failed to read file data: {}", e))
2413 })?;
2414
2415 let content = String::from_utf8(data.to_vec()).map_err(|_| {
2417 ApiError::InvalidRequest("File must be valid UTF-8 text".to_string())
2418 })?;
2419
2420 let spec_value: serde_json::Value =
2422 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&content) {
2423 v
2424 } else if let Ok(v) = serde_yaml::from_str::<serde_json::Value>(&content) {
2425 v
2426 } else {
2427 return Err(ApiError::InvalidRequest(
2428 "File must be a valid JSON or YAML OpenAPI specification".to_string(),
2429 ));
2430 };
2431
2432 if spec_value.get("openapi").is_none() && spec_value.get("swagger").is_none() {
2434 return Err(ApiError::InvalidRequest(
2435 "File must contain an 'openapi' or 'swagger' field".to_string(),
2436 ));
2437 }
2438
2439 let json_data = serde_json::to_vec_pretty(&spec_value).map_err(|e| {
2441 ApiError::Internal(anyhow::anyhow!("Failed to serialize spec: {}", e))
2442 })?;
2443
2444 file_data = Some(json_data);
2445 }
2446 }
2447
2448 let data = file_data.ok_or_else(|| {
2449 ApiError::InvalidRequest("No 'file' or 'spec' field in upload".to_string())
2450 })?;
2451
2452 let url = state
2454 .storage
2455 .upload_spec(&org_ctx.org_id.to_string(), &file_name, data)
2456 .await
2457 .map_err(|e| ApiError::Internal(anyhow::anyhow!("Failed to upload spec: {}", e)))?;
2458
2459 Ok(Json(SpecUploadResponse { url }))
2460}
2461
2462#[derive(Debug, Serialize)]
2463pub struct SpecUploadResponse {
2464 pub url: String,
2465}
2466
2467#[derive(Debug, Deserialize)]
2470pub struct CreateDeploymentRequest {
2471 pub name: String,
2472 pub slug: Option<String>,
2473 pub description: Option<String>,
2474 pub project_id: Option<Uuid>,
2475 pub config_json: serde_json::Value,
2476 pub openapi_spec_url: Option<String>,
2477 pub region: Option<String>,
2478 #[serde(default)]
2483 pub enabled_protocols: Option<Vec<crate::models::Protocol>>,
2484 #[serde(default)]
2489 pub upstream_url: Option<String>,
2490}
2491
2492#[derive(Debug, Deserialize)]
2493pub struct UpdateStatusRequest {
2494 pub status: String,
2495 pub error_message: Option<String>,
2496 pub deployment_url: Option<String>,
2497 pub internal_url: Option<String>,
2498}
2499
2500#[derive(Debug, Serialize)]
2501pub struct DeploymentResponse {
2502 pub id: Uuid,
2503 pub org_id: Uuid,
2504 pub project_id: Option<Uuid>,
2505 pub name: String,
2506 pub slug: String,
2507 pub description: Option<String>,
2508 pub status: String,
2509 pub deployment_url: Option<String>,
2510 pub openapi_spec_url: Option<String>,
2511 pub region: String,
2512 pub instance_type: String,
2513 pub health_status: String,
2514 pub error_message: Option<String>,
2515 pub enabled_protocols: Vec<crate::models::Protocol>,
2516 pub upstream_url: Option<String>,
2520 pub created_at: DateTime<Utc>,
2521 pub updated_at: DateTime<Utc>,
2522}
2523
2524impl From<HostedMock> for DeploymentResponse {
2525 fn from(mock: HostedMock) -> Self {
2526 let status = mock.status().to_string();
2527 let health_status = mock.health_status().to_string();
2528 let enabled_protocols = mock.enabled_protocols();
2529 let upstream_url = mock.upstream_url();
2530 Self {
2531 id: mock.id,
2532 org_id: mock.org_id,
2533 project_id: mock.project_id,
2534 name: mock.name,
2535 slug: mock.slug,
2536 description: mock.description,
2537 status,
2538 deployment_url: mock.deployment_url,
2539 openapi_spec_url: mock.openapi_spec_url,
2540 region: mock.region,
2541 instance_type: mock.instance_type,
2542 health_status,
2543 error_message: mock.error_message,
2544 enabled_protocols,
2545 upstream_url,
2546 created_at: mock.created_at,
2547 updated_at: mock.updated_at,
2548 }
2549 }
2550}
2551
2552#[derive(Debug, Serialize)]
2553pub struct LogResponse {
2554 pub id: Uuid,
2555 pub level: String,
2556 pub message: String,
2557 pub metadata: serde_json::Value,
2558 pub created_at: DateTime<Utc>,
2559}
2560
2561impl From<DeploymentLog> for LogResponse {
2562 fn from(log: DeploymentLog) -> Self {
2563 Self {
2564 id: log.id,
2565 level: log.level,
2566 message: log.message,
2567 metadata: log.metadata_json,
2568 created_at: log.created_at,
2569 }
2570 }
2571}
2572
2573#[derive(Debug, Serialize)]
2574pub struct MetricsResponse {
2575 pub requests: i64,
2576 pub requests_2xx: i64,
2577 pub requests_4xx: i64,
2578 pub requests_5xx: i64,
2579 pub egress_bytes: i64,
2580 pub avg_response_time_ms: i64,
2581 pub period_start: chrono::NaiveDate,
2582}
2583
2584impl From<DeploymentMetrics> for MetricsResponse {
2585 fn from(metrics: DeploymentMetrics) -> Self {
2586 Self {
2587 requests: metrics.requests,
2588 requests_2xx: metrics.requests_2xx,
2589 requests_4xx: metrics.requests_4xx,
2590 requests_5xx: metrics.requests_5xx,
2591 egress_bytes: metrics.egress_bytes,
2592 avg_response_time_ms: metrics.avg_response_time_ms,
2593 period_start: metrics.period_start,
2594 }
2595 }
2596}
2597
2598#[derive(Debug, Deserialize)]
2599pub struct SetDomainRequest {
2600 pub domain: String,
2601}
2602
2603pub async fn set_domain(
2609 State(state): State<AppState>,
2610 AuthUser(user_id): AuthUser,
2611 headers: HeaderMap,
2612 Path(deployment_id): Path<Uuid>,
2613 Json(request): Json<SetDomainRequest>,
2614) -> ApiResult<Json<serde_json::Value>> {
2615 let pool = state.db.pool();
2616
2617 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2619 .await
2620 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2621
2622 let checker = PermissionChecker::new(&state);
2624 checker
2625 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
2626 .await?;
2627
2628 let deployment = HostedMock::find_by_id(pool, deployment_id)
2630 .await
2631 .map_err(ApiError::Database)?
2632 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2633
2634 if deployment.org_id != org_ctx.org_id {
2636 return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
2637 }
2638
2639 let hostname = format!("{}.{}", deployment.slug, request.domain);
2640
2641 let new_url = format!("https://{}", hostname);
2648 sqlx::query(
2649 r#"
2650 UPDATE hosted_mocks
2651 SET deployment_url = $1,
2652 metadata_json = jsonb_set(
2653 COALESCE(metadata_json, '{}'::jsonb),
2654 '{custom_domain}',
2655 to_jsonb($2::text)
2656 ),
2657 updated_at = NOW()
2658 WHERE id = $3
2659 "#,
2660 )
2661 .bind(&new_url)
2662 .bind(&hostname)
2663 .bind(deployment_id)
2664 .execute(pool)
2665 .await
2666 .map_err(|e| ApiError::Internal(anyhow::anyhow!("Failed to update deployment URL: {}", e)))?;
2667
2668 DeploymentLog::create(
2669 pool,
2670 deployment_id,
2671 "info",
2672 &format!("Custom domain set: {}", hostname),
2673 None,
2674 )
2675 .await
2676 .ok();
2677
2678 Ok(Json(serde_json::json!({
2679 "hostname": hostname,
2680 "deployment_url": new_url,
2681 })))
2682}
2683
2684pub async fn get_custom_domain(
2687 State(state): State<AppState>,
2688 AuthUser(user_id): AuthUser,
2689 headers: HeaderMap,
2690 Path(deployment_id): Path<Uuid>,
2691) -> ApiResult<Json<serde_json::Value>> {
2692 let deployment = check_org_access(&state, user_id, &headers, deployment_id).await?;
2693 let hostname = deployment
2694 .metadata_json
2695 .get("custom_domain")
2696 .and_then(|v| v.as_str())
2697 .map(|s| s.to_string());
2698 Ok(Json(serde_json::json!({
2699 "hostname": hostname,
2700 "deployment_url": deployment.deployment_url,
2701 })))
2702}
2703
2704pub async fn clear_custom_domain(
2710 State(state): State<AppState>,
2711 AuthUser(user_id): AuthUser,
2712 headers: HeaderMap,
2713 Path(deployment_id): Path<Uuid>,
2714) -> ApiResult<Json<serde_json::Value>> {
2715 let pool = state.db.pool();
2716
2717 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2718 .await
2719 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2720
2721 let checker = PermissionChecker::new(&state);
2722 checker
2723 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockCreate)
2724 .await?;
2725
2726 let deployment = HostedMock::find_by_id(pool, deployment_id)
2727 .await
2728 .map_err(ApiError::Database)?
2729 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2730 if deployment.org_id != org_ctx.org_id {
2731 return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
2732 }
2733
2734 let app_name = deployment.fly_app_name();
2735 let default_url = if let Ok(domain) = std::env::var("MOCKFORGE_MOCKS_DOMAIN") {
2736 format!("https://{}.{}", deployment.slug, domain)
2737 } else {
2738 format!("https://{}.fly.dev", app_name)
2739 };
2740
2741 sqlx::query(
2742 r#"
2743 UPDATE hosted_mocks
2744 SET deployment_url = $1,
2745 metadata_json = COALESCE(metadata_json, '{}'::jsonb) - 'custom_domain',
2746 updated_at = NOW()
2747 WHERE id = $2
2748 "#,
2749 )
2750 .bind(&default_url)
2751 .bind(deployment_id)
2752 .execute(pool)
2753 .await
2754 .map_err(|e| ApiError::Internal(anyhow::anyhow!("Failed to clear custom domain: {}", e)))?;
2755
2756 DeploymentLog::create(pool, deployment_id, "info", "Custom domain removed", None)
2757 .await
2758 .ok();
2759
2760 Ok(Json(serde_json::json!({
2761 "hostname": serde_json::Value::Null,
2762 "deployment_url": default_url,
2763 })))
2764}
2765
2766#[derive(Debug, Default, Deserialize)]
2771#[serde(default, rename_all = "camelCase")]
2772pub struct TriggerSmokeRunRequest {
2773 pub latency_budget_ms: Option<u64>,
2775 pub methods: Option<Vec<String>>,
2779}
2780
2781pub async fn trigger_smoke_run(
2805 State(state): State<AppState>,
2806 AuthUser(user_id): AuthUser,
2807 Path(deployment_id): Path<Uuid>,
2808 headers: HeaderMap,
2809 body: Option<Json<TriggerSmokeRunRequest>>,
2810) -> ApiResult<Json<TestRun>> {
2811 let req = body.map(|Json(r)| r).unwrap_or_default();
2812
2813 let org_ctx = resolve_org_context(&state, user_id, &headers, None)
2815 .await
2816 .map_err(|_| ApiError::InvalidRequest("Organization not found".to_string()))?;
2817
2818 let checker = PermissionChecker::new(&state);
2819 checker
2820 .require_permission(user_id, org_ctx.org_id, Permission::HostedMockUpdate)
2821 .await?;
2822
2823 let deployment = state
2824 .store
2825 .find_hosted_mock_by_id(deployment_id)
2826 .await?
2827 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".to_string()))?;
2828 if deployment.org_id != org_ctx.org_id {
2829 return Err(ApiError::InvalidRequest("Deployment not found".to_string()));
2831 }
2832 if deployment.status != "active" {
2833 return Err(ApiError::InvalidRequest(format!(
2840 "Deployment is in '{}' status; smoke runs require 'active'",
2841 deployment.status,
2842 )));
2843 }
2844
2845 let limits = crate::handlers::usage::effective_limits(&state, &org_ctx.org).await?;
2847 let max_concurrent = limits.get("max_concurrent_runs").and_then(|v| v.as_i64()).unwrap_or(0);
2848 if max_concurrent == 0 {
2849 return Err(ApiError::ResourceLimitExceeded(
2850 "Test execution is not enabled on this plan".into(),
2851 ));
2852 }
2853 if max_concurrent > 0 {
2854 let inflight = TestRun::count_inflight(state.db.pool(), org_ctx.org_id)
2855 .await
2856 .map_err(ApiError::Database)?;
2857 if inflight.total() >= max_concurrent {
2858 return Err(ApiError::ResourceLimitExceeded(format!(
2859 "Concurrent run limit reached ({}/{}).",
2860 inflight.total(),
2861 max_concurrent,
2862 )));
2863 }
2864 }
2865
2866 let payload = build_smoke_payload(&deployment, &req)?;
2868
2869 let run = TestRun::enqueue(
2871 state.db.pool(),
2872 EnqueueTestRun {
2873 suite_id: deployment.id,
2874 org_id: org_ctx.org_id,
2875 kind: "smoke",
2876 triggered_by: "manual",
2877 triggered_by_user: Some(user_id),
2878 git_ref: None,
2879 git_sha: None,
2880 },
2881 )
2882 .await
2883 .map_err(ApiError::Database)?;
2884
2885 if let Err(e) = crate::run_queue::enqueue(
2887 state.redis.as_ref(),
2888 crate::run_queue::EnqueuedJob {
2889 run_id: run.id,
2890 org_id: run.org_id,
2891 source_id: deployment.id,
2892 kind: "smoke",
2893 payload,
2894 },
2895 )
2896 .await
2897 {
2898 tracing::error!(run_id = %run.id, error = %e, "failed to enqueue smoke run");
2903 }
2904
2905 Ok(Json(run))
2906}
2907
2908fn build_smoke_payload(
2913 deployment: &HostedMock,
2914 req: &TriggerSmokeRunRequest,
2915) -> ApiResult<serde_json::Value> {
2916 let base_url =
2917 deployment.deployment_url.as_deref().filter(|s| !s.is_empty()).ok_or_else(|| {
2918 ApiError::InvalidRequest(
2919 "Deployment has no public URL — wait for the deploy to finish before running smoke"
2920 .to_string(),
2921 )
2922 })?;
2923 let spec_url =
2924 deployment
2925 .openapi_spec_url
2926 .as_deref()
2927 .filter(|s| !s.is_empty())
2928 .ok_or_else(|| {
2929 ApiError::InvalidRequest(
2930 "Deployment has no OpenAPI spec — upload one before running smoke".to_string(),
2931 )
2932 })?;
2933
2934 let mut payload = serde_json::json!({
2935 "deployment_id": deployment.id,
2936 "base_url": base_url,
2937 "openapi_spec_url": spec_url,
2938 });
2939 let obj = payload
2940 .as_object_mut()
2941 .expect("payload was constructed as an object on the line above");
2942
2943 if let Some(budget) = req.latency_budget_ms {
2944 obj.insert("latency_budget_ms".into(), budget.into());
2945 }
2946 if let Some(methods) = req.methods.as_ref() {
2947 let v = serde_json::to_value(methods)
2950 .map_err(|e| ApiError::InvalidRequest(format!("invalid methods array: {e}")))?;
2951 obj.insert("methods".into(), v);
2952 }
2953
2954 Ok(payload)
2955}
2956
2957#[cfg(test)]
2958mod smoke_trigger_tests {
2959 use super::*;
2960
2961 fn deployment_with(
2962 status: &str,
2963 deployment_url: Option<&str>,
2964 spec_url: Option<&str>,
2965 ) -> HostedMock {
2966 HostedMock {
2971 id: Uuid::new_v4(),
2972 org_id: Uuid::new_v4(),
2973 project_id: None,
2974 name: "test".to_string(),
2975 slug: "test".to_string(),
2976 description: None,
2977 config_json: serde_json::json!({}),
2978 openapi_spec_url: spec_url.map(String::from),
2979 status: status.to_string(),
2980 deployment_url: deployment_url.map(String::from),
2981 internal_url: None,
2982 region: "iad".to_string(),
2983 instance_type: "shared-cpu-1x".to_string(),
2984 health_check_url: None,
2985 last_health_check: None,
2986 health_status: "unknown".to_string(),
2987 error_message: None,
2988 metadata_json: serde_json::json!({}),
2989 created_at: Utc::now(),
2990 updated_at: Utc::now(),
2991 deleted_at: None,
2992 }
2993 }
2994
2995 #[test]
2996 fn build_smoke_payload_uses_deployment_urls() {
2997 let dep = deployment_with(
2998 "running",
2999 Some("https://my-mock.fly.dev"),
3000 Some("https://specs.example.com/abc.json"),
3001 );
3002 let req = TriggerSmokeRunRequest::default();
3003 let payload = build_smoke_payload(&dep, &req).unwrap();
3004 assert_eq!(payload["base_url"], "https://my-mock.fly.dev");
3005 assert_eq!(payload["openapi_spec_url"], "https://specs.example.com/abc.json");
3006 assert_eq!(payload["deployment_id"], serde_json::json!(dep.id));
3007 assert!(payload.get("latency_budget_ms").is_none());
3008 assert!(payload.get("methods").is_none());
3009 }
3010
3011 #[test]
3012 fn build_smoke_payload_passes_overrides() {
3013 let dep = deployment_with("running", Some("https://x"), Some("https://y"));
3014 let req = TriggerSmokeRunRequest {
3015 latency_budget_ms: Some(2000),
3016 methods: Some(vec!["GET".into(), "HEAD".into()]),
3017 };
3018 let payload = build_smoke_payload(&dep, &req).unwrap();
3019 assert_eq!(payload["latency_budget_ms"], 2000);
3020 assert_eq!(payload["methods"], serde_json::json!(["GET", "HEAD"]));
3021 }
3022
3023 #[test]
3024 fn build_smoke_payload_rejects_missing_deployment_url() {
3025 let dep = deployment_with("running", None, Some("https://y"));
3026 let err = build_smoke_payload(&dep, &TriggerSmokeRunRequest::default()).unwrap_err();
3027 match err {
3028 ApiError::InvalidRequest(msg) => assert!(msg.contains("public URL")),
3029 other => panic!("expected InvalidRequest, got {other:?}"),
3030 }
3031 }
3032
3033 #[test]
3034 fn build_smoke_payload_rejects_empty_deployment_url() {
3035 let dep = deployment_with("running", Some(""), Some("https://y"));
3036 assert!(matches!(
3037 build_smoke_payload(&dep, &TriggerSmokeRunRequest::default()),
3038 Err(ApiError::InvalidRequest(_))
3039 ));
3040 }
3041
3042 #[test]
3043 fn build_smoke_payload_rejects_missing_spec_url() {
3044 let dep = deployment_with("running", Some("https://x"), None);
3045 let err = build_smoke_payload(&dep, &TriggerSmokeRunRequest::default()).unwrap_err();
3046 match err {
3047 ApiError::InvalidRequest(msg) => assert!(msg.contains("OpenAPI spec")),
3048 other => panic!("expected InvalidRequest, got {other:?}"),
3049 }
3050 }
3051}