use anyhow::{Context, Result};
use sqlx::PgPool;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::time::{interval, Duration};
use tracing::{error, info, warn};
use uuid::Uuid;
use crate::deployment::flyio::{
FlyioCheck, FlyioClient, FlyioMachineConfig, FlyioPort, FlyioRegistryAuth, FlyioService,
};
use crate::models::{DeploymentLog, DeploymentStatus, HostedMock};
pub struct DeploymentOrchestrator {
db: Arc<PgPool>,
flyio_client: Option<FlyioClient>,
flyio_org_slug: Option<String>,
}
impl DeploymentOrchestrator {
pub fn new(
db: Arc<PgPool>,
flyio_token: Option<String>,
flyio_org_slug: Option<String>,
) -> Self {
let flyio_client = flyio_token.map(FlyioClient::new);
Self {
db,
flyio_client,
flyio_org_slug: flyio_org_slug.or_else(|| std::env::var("FLYIO_ORG_SLUG").ok()),
}
}
pub fn start(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(10));
loop {
interval.tick().await;
if let Err(e) = self.process_pending_deployments().await {
error!("Error processing pending deployments: {}", e);
}
}
})
}
async fn process_pending_deployments(&self) -> Result<()> {
let pool = self.db.as_ref();
let deployments = sqlx::query_as::<_, HostedMock>(
r#"
SELECT * FROM hosted_mocks
WHERE status IN ('pending', 'deploying')
AND deployment_url IS NULL
AND deleted_at IS NULL
ORDER BY created_at ASC
LIMIT 10
"#,
)
.fetch_all(pool)
.await
.context("Failed to fetch pending deployments")?;
for deployment in deployments {
if let Err(e) = self.deploy(&deployment).await {
let error_msg = format!("{:#}", e);
error!("Failed to deploy {}: {}", deployment.id, error_msg);
let _ = HostedMock::update_status(
pool,
deployment.id,
DeploymentStatus::Failed,
Some(&format!("Deployment failed: {}", error_msg)),
)
.await;
let _ = DeploymentLog::create(
pool,
deployment.id,
"error",
&format!("Deployment failed: {}", error_msg),
None,
)
.await;
}
}
Ok(())
}
async fn deploy(&self, deployment: &HostedMock) -> Result<()> {
info!("Deploying mock service: {} ({})", deployment.name, deployment.id);
let pool = self.db.as_ref();
HostedMock::update_status(pool, deployment.id, DeploymentStatus::Deploying, None)
.await
.context("Failed to update deployment status")?;
DeploymentLog::create(pool, deployment.id, "info", "Starting deployment", None)
.await
.context("Failed to create deployment log")?;
if let Some(ref flyio_client) = self.flyio_client {
self.deploy_to_flyio(flyio_client, deployment).await?;
} else {
self.deploy_to_multitenant_router(deployment).await?;
}
HostedMock::update_status(pool, deployment.id, DeploymentStatus::Active, None)
.await
.context("Failed to update deployment status")?;
DeploymentLog::create(
pool,
deployment.id,
"info",
"Deployment completed successfully",
None,
)
.await
.context("Failed to create deployment log")?;
info!("Successfully deployed mock service: {}", deployment.id);
Ok(())
}
async fn deploy_to_flyio(&self, client: &FlyioClient, deployment: &HostedMock) -> Result<()> {
let pool = self.db.as_ref();
let org_slug = self
.flyio_org_slug
.as_deref()
.ok_or_else(|| anyhow::anyhow!("FLYIO_ORG_SLUG not configured"))?;
let app_name = deployment.fly_app_name();
DeploymentLog::create(
pool,
deployment.id,
"info",
&format!("Creating Fly.io app: {}", app_name),
None,
)
.await?;
let is_new_app;
let _app = match client.get_app(&app_name).await {
Ok(app) => {
DeploymentLog::create(
pool,
deployment.id,
"info",
"Using existing Fly.io app",
None,
)
.await?;
is_new_app = false;
app
}
Err(_) => {
let app = client
.create_app(&app_name, org_slug)
.await
.context("Failed to create Fly.io app")?;
is_new_app = true;
app
}
};
if is_new_app {
client.allocate_ips(&app_name).await.context("Failed to allocate public IPs")?;
DeploymentLog::create(pool, deployment.id, "info", "Allocated public IPs", None)
.await?;
}
let mut env = HashMap::new();
env.insert("MOCKFORGE_DEPLOYMENT_ID".to_string(), deployment.id.to_string());
env.insert("MOCKFORGE_ORG_ID".to_string(), deployment.org_id.to_string());
env.insert("MOCKFORGE_CONFIG".to_string(), serde_json::to_string(&deployment.config_json)?);
env.insert("PORT".to_string(), "3000".to_string());
if let Some(ref spec_url) = deployment.openapi_spec_url {
env.insert("MOCKFORGE_OPENAPI_SPEC_URL".to_string(), spec_url.clone());
}
let enabled_protocols = deployment.enabled_protocols();
for protocol in &enabled_protocols {
if let Some((key, value)) = protocol.enable_env() {
env.insert(key.to_string(), value);
}
}
if enabled_protocols.contains(&crate::models::Protocol::Kafka) {
let advertised_host = if let Ok(domain) = std::env::var("MOCKFORGE_MOCKS_DOMAIN") {
format!("{}.{}", deployment.slug, domain)
} else {
format!("{}.fly.dev", app_name)
};
env.insert("MOCKFORGE_KAFKA_ADVERTISED_HOST".to_string(), advertised_host);
env.insert("MOCKFORGE_KAFKA_ADVERTISED_PORT".to_string(), "9092".to_string());
}
if let Ok(jwt_secret) = std::env::var("JWT_SECRET") {
if let Ok(ingest_base) = std::env::var("MOCKFORGE_LOG_INGEST_BASE_URL") {
let trimmed_base = ingest_base.trim_end_matches('/');
let token = mockforge_registry_core::auth::create_deployment_ingest_token(
deployment.id,
&jwt_secret,
30, )
.ok();
if let Some(token) = token {
env.insert(
"MOCKFORGE_LOG_INGEST_URL".to_string(),
format!(
"{}/api/v1/hosted-mocks/{}/log-ingest",
trimmed_base, deployment.id
),
);
env.insert("MOCKFORGE_LOG_INGEST_TOKEN".to_string(), token.clone());
env.insert(
"MOCKFORGE_CAPTURE_INGEST_URL".to_string(),
format!(
"{}/api/v1/hosted-mocks/{}/captures/ingest",
trimmed_base, deployment.id
),
);
env.insert("MOCKFORGE_CAPTURE_INGEST_TOKEN".to_string(), token);
}
}
}
if let Ok(otlp_endpoint) = std::env::var("MOCKFORGE_OTLP_INGEST_ENDPOINT") {
if !otlp_endpoint.trim().is_empty() {
env.insert("MOCKFORGE_OTLP_ENDPOINT".to_string(), otlp_endpoint);
env.insert(
"MOCKFORGE_OTLP_SERVICE_NAME".to_string(),
format!("hosted-mock/{}", deployment.slug),
);
}
}
if let Some(upstream) = deployment.upstream_url() {
env.insert("MOCKFORGE_PROXY_UPSTREAM".to_string(), upstream);
}
let image = std::env::var("MOCKFORGE_DOCKER_IMAGE")
.unwrap_or_else(|_| "ghcr.io/saasy-solutions/mockforge:latest".to_string());
let mut services = vec![FlyioService {
protocol: "tcp".to_string(),
internal_port: 3000,
ports: vec![
FlyioPort {
port: 80,
handlers: vec!["http".to_string()],
},
FlyioPort {
port: 443,
handlers: vec!["tls".to_string(), "http".to_string()],
},
],
}];
let mut bound_internal_ports: std::collections::HashSet<u16> =
std::collections::HashSet::new();
bound_internal_ports.insert(3000);
for protocol in deployment.enabled_protocols() {
let Some(internal) = protocol.internal_port() else {
continue; };
if !bound_internal_ports.insert(internal) {
continue;
}
let public = protocol.public_port().unwrap_or(internal);
let handlers: Vec<String> =
protocol.fly_handlers().iter().map(|s| (*s).to_string()).collect();
services.push(FlyioService {
protocol: "tcp".to_string(),
internal_port: internal,
ports: vec![FlyioPort {
port: public,
handlers,
}],
});
}
let mut checks = HashMap::new();
checks.insert(
"alive".to_string(),
FlyioCheck {
check_type: "http".to_string(),
port: 3000,
grace_period: "10s".to_string(),
interval: "15s".to_string(),
method: "GET".to_string(),
timeout: "2s".to_string(),
tls_skip_verify: false,
path: Some("/health/live".to_string()),
},
);
let machine_config = FlyioMachineConfig {
image,
env,
services,
checks: Some(checks),
};
DeploymentLog::create(pool, deployment.id, "info", "Creating Fly.io machine", None).await?;
let registry_auth = if let (Ok(server), Ok(username), Ok(password)) = (
std::env::var("DOCKER_REGISTRY_SERVER"),
std::env::var("DOCKER_REGISTRY_USERNAME"),
std::env::var("DOCKER_REGISTRY_PASSWORD"),
) {
Some(FlyioRegistryAuth {
server,
username,
password,
})
} else if machine_config.image.starts_with("registry.fly.io/") {
Some(FlyioRegistryAuth {
server: "registry.fly.io".to_string(),
username: "x".to_string(),
password: client.api_token().to_string(),
})
} else if machine_config.image.starts_with("ghcr.io/") {
if let Ok(token) = std::env::var("GHCR_TOKEN") {
Some(FlyioRegistryAuth {
server: "ghcr.io".to_string(),
username: std::env::var("GHCR_USERNAME")
.unwrap_or_else(|_| "mockforge".to_string()),
password: token,
})
} else {
tracing::warn!(
"GHCR image '{}' requires GHCR_TOKEN env var for authentication",
machine_config.image
);
None
}
} else {
None
};
let machine = client
.create_machine(&app_name, machine_config, &deployment.region, registry_auth)
.await
.context("Failed to create Fly.io machine")?;
let deployment_url = if let Ok(domain) = std::env::var("MOCKFORGE_MOCKS_DOMAIN") {
format!("https://{}.{}", deployment.slug, domain)
} else {
format!("https://{}.fly.dev", app_name)
};
let internal_url = format!("https://{}.fly.dev", app_name);
let health_check_url = format!("https://{}.fly.dev/health/live", app_name);
sqlx::query(
r#"
UPDATE hosted_mocks
SET
deployment_url = $1,
internal_url = $2,
health_check_url = $3,
metadata_json = jsonb_set(
COALESCE(metadata_json, '{}'::jsonb),
'{flyio_machine_id}',
to_jsonb($4::text)
),
updated_at = NOW()
WHERE id = $5
"#,
)
.bind(&deployment_url)
.bind(&internal_url)
.bind(&health_check_url)
.bind(&machine.id)
.bind(deployment.id)
.execute(pool)
.await
.context("Failed to update deployment URLs")?;
DeploymentLog::create(
pool,
deployment.id,
"info",
&format!("Deployment URL: {}", deployment_url),
None,
)
.await?;
Ok(())
}
pub fn db(&self) -> &PgPool {
&self.db
}
pub async fn redeploy(&self, deployment: &HostedMock) -> Result<()> {
info!("Redeploying mock service: {} ({})", deployment.name, deployment.id);
let pool = self.db.as_ref();
HostedMock::update_status(pool, deployment.id, DeploymentStatus::Deploying, None)
.await
.context("Failed to update deployment status")?;
DeploymentLog::create(pool, deployment.id, "info", "Starting redeployment", None)
.await
.context("Failed to create deployment log")?;
if let Some(ref flyio_client) = self.flyio_client {
self.redeploy_to_flyio(flyio_client, deployment).await?;
} else {
self.deploy_to_multitenant_router(deployment).await?;
}
HostedMock::update_status(pool, deployment.id, DeploymentStatus::Active, None)
.await
.context("Failed to update deployment status")?;
DeploymentLog::create(
pool,
deployment.id,
"info",
"Redeployment completed successfully",
None,
)
.await
.context("Failed to create deployment log")?;
info!("Successfully redeployed mock service: {}", deployment.id);
Ok(())
}
async fn redeploy_to_flyio(&self, client: &FlyioClient, deployment: &HostedMock) -> Result<()> {
let pool = self.db.as_ref();
let machine_id = deployment
.metadata_json
.get("flyio_machine_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("No Fly.io machine ID found in deployment metadata"))?;
let app_name = format!(
"mockforge-{}-{}",
deployment
.org_id
.to_string()
.replace('-', "")
.chars()
.take(8)
.collect::<String>(),
deployment.slug
);
DeploymentLog::create(
pool,
deployment.id,
"info",
&format!("Updating Fly.io machine {} in app {}", machine_id, app_name),
None,
)
.await?;
let mut env = HashMap::new();
env.insert("MOCKFORGE_DEPLOYMENT_ID".to_string(), deployment.id.to_string());
env.insert("MOCKFORGE_ORG_ID".to_string(), deployment.org_id.to_string());
env.insert("MOCKFORGE_CONFIG".to_string(), serde_json::to_string(&deployment.config_json)?);
env.insert("PORT".to_string(), "3000".to_string());
if let Some(ref spec_url) = deployment.openapi_spec_url {
env.insert("MOCKFORGE_OPENAPI_SPEC_URL".to_string(), spec_url.clone());
}
let image = std::env::var("MOCKFORGE_DOCKER_IMAGE")
.unwrap_or_else(|_| "ghcr.io/saasy-solutions/mockforge:latest".to_string());
let services = vec![FlyioService {
protocol: "tcp".to_string(),
internal_port: 3000,
ports: vec![
FlyioPort {
port: 80,
handlers: vec!["http".to_string()],
},
FlyioPort {
port: 443,
handlers: vec!["tls".to_string(), "http".to_string()],
},
],
}];
let mut checks = HashMap::new();
checks.insert(
"alive".to_string(),
FlyioCheck {
check_type: "http".to_string(),
port: 3000,
grace_period: "10s".to_string(),
interval: "15s".to_string(),
method: "GET".to_string(),
timeout: "2s".to_string(),
tls_skip_verify: false,
path: Some("/health/live".to_string()),
},
);
let machine_config = FlyioMachineConfig {
image,
env,
services,
checks: Some(checks),
};
let registry_auth = if let (Ok(server), Ok(username), Ok(password)) = (
std::env::var("DOCKER_REGISTRY_SERVER"),
std::env::var("DOCKER_REGISTRY_USERNAME"),
std::env::var("DOCKER_REGISTRY_PASSWORD"),
) {
Some(FlyioRegistryAuth {
server,
username,
password,
})
} else if machine_config.image.starts_with("registry.fly.io/") {
Some(FlyioRegistryAuth {
server: "registry.fly.io".to_string(),
username: "x".to_string(),
password: client.api_token().to_string(),
})
} else if machine_config.image.starts_with("ghcr.io/") {
if let Ok(token) = std::env::var("GHCR_TOKEN") {
Some(FlyioRegistryAuth {
server: "ghcr.io".to_string(),
username: std::env::var("GHCR_USERNAME")
.unwrap_or_else(|_| "mockforge".to_string()),
password: token,
})
} else {
tracing::warn!(
"GHCR image '{}' requires GHCR_TOKEN env var for authentication",
machine_config.image
);
None
}
} else {
None
};
client
.update_machine(&app_name, machine_id, machine_config, registry_auth)
.await
.context("Failed to update Fly.io machine")?;
DeploymentLog::create(pool, deployment.id, "info", "Machine updated and restarting", None)
.await?;
Ok(())
}
async fn deploy_to_multitenant_router(&self, deployment: &HostedMock) -> Result<()> {
let pool = self.db.as_ref();
let base_url = std::env::var("MOCKFORGE_BASE_URL")
.unwrap_or_else(|_| "https://mocks.mockforge.dev".to_string());
let deployment_url =
format!("{}/mocks/{}/{}", base_url, deployment.org_id, deployment.slug);
let health_check_url = format!("{}/health/live", deployment_url);
sqlx::query(
r#"
UPDATE hosted_mocks
SET
deployment_url = $1,
health_check_url = $2,
updated_at = NOW()
WHERE id = $3
"#,
)
.bind(&deployment_url)
.bind(&health_check_url)
.bind(deployment.id)
.execute(pool)
.await
.context("Failed to update deployment URLs")?;
DeploymentLog::create(
pool,
deployment.id,
"info",
&format!("Deployed to multitenant router: {}", deployment_url),
None,
)
.await?;
Ok(())
}
pub async fn delete_deployment(&self, deployment_id: Uuid) -> Result<()> {
let pool = self.db.as_ref();
let deployment = HostedMock::find_by_id(pool, deployment_id)
.await
.context("Failed to find deployment")?
.ok_or_else(|| anyhow::anyhow!("Deployment not found"))?;
HostedMock::update_status(pool, deployment_id, DeploymentStatus::Deleting, None).await?;
DeploymentLog::create(pool, deployment_id, "info", "Starting deletion", None).await?;
if let Some(ref flyio_client) = self.flyio_client {
if let Some(machine_id) =
deployment.metadata_json.get("flyio_machine_id").and_then(|v| v.as_str())
{
let app_name = format!(
"mockforge-{}-{}",
deployment
.org_id
.to_string()
.replace('-', "")
.chars()
.take(8)
.collect::<String>(),
deployment.slug
);
if let Err(e) = flyio_client.delete_machine(&app_name, machine_id).await {
warn!("Failed to delete Fly.io machine: {}", e);
}
if let Err(e) = flyio_client.delete_app(&app_name).await {
warn!("Failed to delete Fly.io app {}: {}", app_name, e);
}
}
}
sqlx::query(
r#"
UPDATE hosted_mocks
SET deleted_at = NOW()
WHERE id = $1
"#,
)
.bind(deployment_id)
.execute(pool)
.await?;
DeploymentLog::create(pool, deployment_id, "info", "Deletion completed", None).await?;
Ok(())
}
}