use anyhow::{bail, Context, Result};
use chrono::{DateTime, Utc};
use sqlx::PgPool;
use uuid::Uuid;
use crate::db::models::{Deployment, DeploymentStatus, TerminationReason};
use crate::server::deployment::state_machine;
pub struct CreateDeploymentParams<'a> {
pub deployment_id: &'a str,
pub project_id: Uuid,
pub created_by_id: Uuid,
pub status: DeploymentStatus,
pub image: Option<&'a str>,
pub image_digest: Option<&'a str>,
pub rolled_back_from_deployment_id: Option<Uuid>,
pub deployment_group: &'a str,
pub expires_at: Option<DateTime<Utc>>,
pub http_port: i32,
pub is_active: bool,
}
pub async fn list_for_project(pool: &PgPool, project_id: Uuid) -> Result<Vec<Deployment>> {
let deployments = sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
termination_reason as "termination_reason: _",
created_at, updated_at
FROM deployments
WHERE project_id = $1
ORDER BY created_at DESC
"#,
project_id
)
.fetch_all(pool)
.await
.context("Failed to list deployments for project")?;
Ok(deployments)
}
pub async fn get_deployments_batch(
pool: &PgPool,
deployment_ids: &[Uuid],
) -> Result<std::collections::HashMap<Uuid, Deployment>> {
let deployments = sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
termination_reason as "termination_reason: _",
created_at, updated_at
FROM deployments
WHERE id = ANY($1)
"#,
deployment_ids
)
.fetch_all(pool)
.await
.context("Failed to fetch deployments batch")?;
let mut map = std::collections::HashMap::new();
for deployment in deployments {
map.insert(deployment.id, deployment);
}
Ok(map)
}
pub async fn find_by_id(pool: &PgPool, id: Uuid) -> Result<Option<Deployment>> {
let deployment = sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
termination_reason as "termination_reason: _",
created_at, updated_at
FROM deployments
WHERE id = $1
"#,
id
)
.fetch_optional(pool)
.await?;
Ok(deployment)
}
pub async fn find_by_deployment_id(
pool: &PgPool,
deployment_id: &str,
project_id: Uuid,
) -> Result<Option<Deployment>> {
let deployment = sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
termination_reason as "termination_reason: _",
created_at, updated_at
FROM deployments
WHERE deployment_id = $1 AND project_id = $2
"#,
deployment_id,
project_id
)
.fetch_optional(pool)
.await
.context("Failed to find deployment by deployment_id")?;
Ok(deployment)
}
pub async fn create(pool: &PgPool, params: CreateDeploymentParams<'_>) -> Result<Deployment> {
let status_str = params.status.to_string();
let deployment = sqlx::query_as!(
Deployment,
r#"
INSERT INTO deployments (deployment_id, project_id, created_by_id, status, image, image_digest, rolled_back_from_deployment_id, deployment_group, expires_at, http_port, is_active)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
RETURNING
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
"#,
params.deployment_id,
params.project_id,
params.created_by_id,
status_str,
params.image,
params.image_digest,
params.rolled_back_from_deployment_id,
params.deployment_group,
params.expires_at,
params.http_port,
params.is_active
)
.fetch_one(pool)
.await
.context("Failed to create deployment")?;
Ok(deployment)
}
pub async fn update_status(
pool: &PgPool,
id: Uuid,
status: DeploymentStatus,
) -> Result<Deployment> {
let current = sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
termination_reason as "termination_reason: _",
created_at, updated_at
FROM deployments
WHERE id = $1
"#,
id
)
.fetch_optional(pool)
.await
.context("Failed to fetch deployment for status update")?;
let current = current.ok_or_else(|| anyhow::anyhow!("Deployment not found"))?;
state_machine::validate_transition(¤t.status, &status)?;
let status_str = status.to_string();
let deployment = sqlx::query_as!(
Deployment,
r#"
UPDATE deployments
SET status = $2,
deploying_started_at = CASE
WHEN $2 = 'Deploying' AND deploying_started_at IS NULL THEN NOW()
ELSE deploying_started_at
END
WHERE id = $1
RETURNING
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
termination_reason as "termination_reason: _",
created_at, updated_at
"#,
id,
status_str
)
.fetch_optional(pool)
.await
.context("Failed to execute deployment status update")?;
match deployment {
Some(d) => Ok(d),
None => {
tracing::warn!(
"UPDATE returned 0 rows for deployment {} (transition {} -> {}), but validation passed",
current.deployment_id,
current.status,
status
);
bail!("Failed to update deployment status: deployment may have been modified concurrently");
}
}
}
pub async fn mark_failed(pool: &PgPool, id: Uuid, error_message: &str) -> Result<Deployment> {
let deployment = sqlx::query_as!(
Deployment,
r#"
UPDATE deployments
SET status = 'Failed', completed_at = NOW(), error_message = $2
WHERE id = $1
RETURNING
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
termination_reason as "termination_reason: _",
created_at, updated_at
"#,
id,
error_message
)
.fetch_one(pool)
.await
.context("Failed to mark deployment as failed")?;
Ok(deployment)
}
#[cfg(feature = "backend")]
pub async fn find_non_terminal(pool: &PgPool, limit: i64) -> Result<Vec<Deployment>> {
let deployments = sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
termination_reason as "termination_reason: _",
created_at, updated_at
FROM deployments
WHERE NOT is_terminal(status)
ORDER BY updated_at ASC
LIMIT $1
"#,
limit
)
.fetch_all(pool)
.await
.context("Failed to find non-terminal deployments")?;
Ok(deployments)
}
#[cfg(feature = "backend")]
pub async fn find_by_status(pool: &PgPool, status: DeploymentStatus) -> Result<Vec<Deployment>> {
let status_str = status.to_string();
let deployments = sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
termination_reason as "termination_reason: _",
created_at, updated_at
FROM deployments
WHERE status = $1
ORDER BY created_at DESC
"#,
status_str
)
.fetch_all(pool)
.await
.context("Failed to find deployments by status")?;
Ok(deployments)
}
#[cfg(feature = "backend")]
pub async fn update_controller_metadata(
pool: &PgPool,
id: Uuid,
metadata: &serde_json::Value,
) -> Result<Deployment> {
let deployment = sqlx::query_as!(
Deployment,
r#"
UPDATE deployments
SET controller_metadata = $2
WHERE id = $1
RETURNING
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
termination_reason as "termination_reason: _",
created_at, updated_at
"#,
id,
metadata
)
.fetch_one(pool)
.await
.context("Failed to update controller metadata")?;
Ok(deployment)
}
pub async fn find_by_project_and_deployment_id(
pool: &PgPool,
project_id: Uuid,
deployment_id: &str,
) -> Result<Option<Deployment>> {
find_by_deployment_id(pool, deployment_id, project_id).await
}
#[cfg(feature = "backend")]
pub async fn mark_cancelled(pool: &PgPool, id: Uuid) -> Result<Deployment> {
let deployment = sqlx::query_as!(
Deployment,
r#"
UPDATE deployments
SET
status = 'Cancelled',
termination_reason = 'Cancelled',
controller_metadata = '{}',
updated_at = NOW()
WHERE id = $1
RETURNING
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
"#,
id
)
.fetch_one(pool)
.await
.context("Failed to mark deployment as cancelled")?;
Ok(deployment)
}
#[cfg(feature = "backend")]
pub async fn mark_stopped(pool: &PgPool, id: Uuid) -> Result<Deployment> {
let deployment = sqlx::query_as!(
Deployment,
r#"
UPDATE deployments
SET
status = 'Stopped',
termination_reason = 'UserStopped',
controller_metadata = '{}',
updated_at = NOW()
WHERE id = $1
RETURNING
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
"#,
id
)
.fetch_one(pool)
.await
.context("Failed to mark deployment as stopped")?;
Ok(deployment)
}
#[cfg(feature = "backend")]
pub async fn mark_superseded(pool: &PgPool, id: Uuid) -> Result<Deployment> {
let deployment = sqlx::query_as!(
Deployment,
r#"
UPDATE deployments
SET
status = 'Superseded',
termination_reason = 'Superseded',
controller_metadata = '{}',
updated_at = NOW()
WHERE id = $1
RETURNING
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
"#,
id
)
.fetch_one(pool)
.await
.context("Failed to mark deployment as superseded")?;
Ok(deployment)
}
#[cfg(feature = "backend")]
pub async fn mark_expired(pool: &PgPool, id: Uuid) -> Result<Deployment> {
let deployment = sqlx::query_as!(
Deployment,
r#"
UPDATE deployments
SET
status = 'Expired',
termination_reason = 'Expired',
controller_metadata = '{}',
updated_at = NOW()
WHERE id = $1
RETURNING
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
"#,
id
)
.fetch_one(pool)
.await
.context("Failed to mark deployment as expired")?;
Ok(deployment)
}
#[cfg(feature = "backend")]
pub async fn mark_healthy(pool: &PgPool, id: Uuid) -> Result<Deployment> {
let deployment = sqlx::query_as!(
Deployment,
r#"
UPDATE deployments
SET
status = 'Healthy',
error_message = NULL,
updated_at = NOW()
WHERE id = $1
RETURNING
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
"#,
id
)
.fetch_one(pool)
.await
.context("Failed to mark deployment as healthy")?;
Ok(deployment)
}
#[cfg(feature = "backend")]
pub async fn mark_unhealthy(pool: &PgPool, id: Uuid, reason: String) -> Result<Deployment> {
let deployment = sqlx::query_as!(
Deployment,
r#"
UPDATE deployments
SET
status = 'Unhealthy',
error_message = $2,
updated_at = NOW()
WHERE id = $1
RETURNING
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
"#,
id,
reason
)
.fetch_one(pool)
.await
.context("Failed to mark deployment as unhealthy")?;
Ok(deployment)
}
pub async fn mark_terminating(
pool: &PgPool,
id: Uuid,
reason: TerminationReason,
) -> Result<Deployment> {
let deployment = sqlx::query_as!(
Deployment,
r#"
UPDATE deployments
SET
status = 'Terminating',
termination_reason = $2,
updated_at = NOW()
WHERE id = $1
RETURNING
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
"#,
id,
reason as TerminationReason
)
.fetch_one(pool)
.await
.context("Failed to mark deployment as terminating")?;
Ok(deployment)
}
pub async fn mark_cancelling(pool: &PgPool, id: Uuid) -> Result<Deployment> {
let deployment = sqlx::query_as!(
Deployment,
r#"
UPDATE deployments
SET
status = 'Cancelling',
termination_reason = 'Cancelled',
updated_at = NOW()
WHERE id = $1
RETURNING
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
"#,
id
)
.fetch_one(pool)
.await
.context("Failed to mark deployment as cancelling")?;
Ok(deployment)
}
pub async fn mark_needs_reconcile(pool: &PgPool, id: Uuid) -> Result<()> {
sqlx::query!(
"UPDATE deployments SET needs_reconcile = TRUE, updated_at = NOW() WHERE id = $1",
id
)
.execute(pool)
.await
.context("Failed to mark deployment as needing reconciliation")?;
Ok(())
}
pub async fn clear_needs_reconcile(pool: &PgPool, id: Uuid) -> Result<()> {
sqlx::query!(
"UPDATE deployments SET needs_reconcile = FALSE, updated_at = NOW() WHERE id = $1",
id
)
.execute(pool)
.await
.context("Failed to clear needs_reconcile flag")?;
Ok(())
}
pub async fn find_needing_reconcile(pool: &PgPool, limit: i64) -> Result<Vec<Deployment>> {
let deployments = sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
FROM deployments
WHERE needs_reconcile = TRUE
AND status IN ('Healthy', 'Unhealthy')
ORDER BY updated_at ASC
LIMIT $1
"#,
limit
)
.fetch_all(pool)
.await
.context("Failed to find deployments needing reconciliation")?;
Ok(deployments)
}
#[cfg(feature = "backend")]
pub async fn find_active_for_project_and_group(
pool: &PgPool,
project_id: Uuid,
group: &str,
) -> Result<Option<Deployment>> {
let deployment = sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
FROM deployments
WHERE project_id = $1
AND deployment_group = $2
AND status = 'Healthy'
ORDER BY created_at DESC
LIMIT 1
"#,
project_id,
group
)
.fetch_optional(pool)
.await
.context("Failed to find active deployment for project and group")?;
Ok(deployment)
}
pub async fn find_non_terminal_for_project_and_group(
pool: &PgPool,
project_id: Uuid,
group: &str,
) -> Result<Vec<Deployment>> {
let deployments = sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
FROM deployments
WHERE project_id = $1
AND deployment_group = $2
AND NOT is_terminal(status)
ORDER BY created_at DESC
"#,
project_id,
group
)
.fetch_all(pool)
.await
.context("Failed to find non-terminal deployments for project and group")?;
Ok(deployments)
}
pub async fn find_active_deployment_for_group(
pool: &PgPool,
project_id: Uuid,
group: &str,
) -> Result<Option<Deployment>> {
let deployment = sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
FROM deployments
WHERE project_id = $1
AND deployment_group = $2
AND is_active = TRUE
LIMIT 1
"#,
project_id,
group
)
.fetch_optional(pool)
.await
.context("Failed to find active deployment for project and group")?;
Ok(deployment)
}
pub async fn find_last_for_project_and_group(
pool: &PgPool,
project_id: Uuid,
group: &str,
) -> Result<Option<Deployment>> {
let deployment = sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
FROM deployments
WHERE project_id = $1
AND deployment_group = $2
ORDER BY created_at DESC
LIMIT 1
"#,
project_id,
group
)
.fetch_optional(pool)
.await
.context("Failed to find last deployment for project and group")?;
Ok(deployment)
}
#[cfg(feature = "backend")]
pub async fn find_expired(pool: &PgPool, limit: i64) -> Result<Vec<Deployment>> {
let deployments = sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
FROM deployments
WHERE expires_at IS NOT NULL
AND expires_at <= NOW()
AND NOT is_terminal(status)
ORDER BY expires_at ASC
LIMIT $1
"#,
limit
)
.fetch_all(pool)
.await
.context("Failed to find expired deployments")?;
Ok(deployments)
}
pub async fn list_for_project_and_group(
pool: &PgPool,
project_id: Uuid,
group: Option<&str>,
limit: Option<i64>,
offset: Option<i64>,
) -> Result<Vec<Deployment>> {
let limit_value = limit.unwrap_or(10);
let offset_value = offset.unwrap_or(0);
let deployments = if let Some(g) = group {
sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
FROM deployments
WHERE project_id = $1 AND deployment_group = $2
ORDER BY created_at DESC
LIMIT $3 OFFSET $4
"#,
project_id,
g,
limit_value,
offset_value
)
.fetch_all(pool)
.await?
} else {
sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
termination_reason as "termination_reason: _",
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
created_at, updated_at
FROM deployments
WHERE project_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
"#,
project_id,
limit_value,
offset_value
)
.fetch_all(pool)
.await?
};
Ok(deployments)
}
pub async fn get_active_deployment_groups(pool: &PgPool, project_id: Uuid) -> Result<Vec<String>> {
let groups = sqlx::query_scalar!(
r#"
SELECT DISTINCT deployment_group
FROM deployments
WHERE project_id = $1
AND (
-- Always include default group if it has any deployments
deployment_group = 'default'
OR
-- Include other groups only if they have non-terminal deployments
(deployment_group != 'default' AND NOT is_terminal(status))
)
ORDER BY deployment_group
"#,
project_id
)
.fetch_all(pool)
.await?;
Ok(groups)
}
pub async fn get_all_deployment_groups(pool: &PgPool, project_id: Uuid) -> Result<Vec<String>> {
let groups = sqlx::query_scalar!(
r#"
WITH group_priority AS (
SELECT
deployment_group,
CASE
WHEN deployment_group = 'default' THEN 0
WHEN EXISTS (
SELECT 1 FROM deployments d2
WHERE d2.project_id = $1
AND d2.deployment_group = deployments.deployment_group
AND d2.status = 'Healthy'
) THEN 1
ELSE 2
END as priority
FROM deployments
WHERE project_id = $1
GROUP BY deployment_group
)
SELECT deployment_group
FROM group_priority
ORDER BY priority, deployment_group
"#,
project_id
)
.fetch_all(pool)
.await?;
Ok(groups)
}
pub async fn mark_as_active(
pool: &PgPool,
deployment_id: Uuid,
project_id: Uuid,
deployment_group: &str,
) -> Result<()> {
let mut tx = pool.begin().await?;
sqlx::query!(
"UPDATE deployments
SET is_active = FALSE, updated_at = NOW()
WHERE project_id = $1 AND deployment_group = $2 AND is_active = TRUE",
project_id,
deployment_group
)
.execute(&mut *tx)
.await?;
sqlx::query!(
"UPDATE deployments
SET is_active = TRUE, updated_at = NOW()
WHERE id = $1",
deployment_id
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
#[allow(dead_code)]
pub async fn mark_as_inactive(pool: &PgPool, deployment_id: Uuid) -> Result<()> {
sqlx::query!(
"UPDATE deployments
SET is_active = FALSE, updated_at = NOW()
WHERE id = $1",
deployment_id
)
.execute(pool)
.await?;
Ok(())
}
pub async fn get_active_deployments_for_project(
pool: &PgPool,
project_id: Uuid,
) -> Result<Vec<Deployment>> {
let deployments = sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
completed_at, error_message, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
termination_reason as "termination_reason: _",
created_at, updated_at
FROM deployments
WHERE project_id = $1 AND is_active = TRUE
ORDER BY deployment_group, created_at DESC
"#,
project_id
)
.fetch_all(pool)
.await?;
Ok(deployments)
}
#[cfg(feature = "backend")]
pub async fn find_stuck_pre_pushed_before(
pool: &PgPool,
threshold: DateTime<Utc>,
limit: i64,
) -> Result<Vec<Deployment>> {
let deployments = sqlx::query_as!(
Deployment,
r#"
SELECT
id, deployment_id, project_id, created_by_id,
status as "status: DeploymentStatus",
deployment_group, expires_at,
error_message, completed_at, build_logs,
controller_metadata as "controller_metadata: serde_json::Value",
image, image_digest, rolled_back_from_deployment_id,
http_port, needs_reconcile, is_active,
deploying_started_at,
termination_reason as "termination_reason: _",
created_at, updated_at
FROM deployments
WHERE status IN ('Pending', 'Building', 'Pushing')
AND created_at < $1
AND NOT is_protected(status)
LIMIT $2
"#,
threshold,
limit
)
.fetch_all(pool)
.await
.context("Failed to find stuck pre-pushed deployments")?;
Ok(deployments)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::server::deployment::state_machine;
fn str_to_status(s: &str) -> DeploymentStatus {
match s {
"Pending" => DeploymentStatus::Pending,
"Building" => DeploymentStatus::Building,
"Pushing" => DeploymentStatus::Pushing,
"Pushed" => DeploymentStatus::Pushed,
"Deploying" => DeploymentStatus::Deploying,
"Healthy" => DeploymentStatus::Healthy,
"Unhealthy" => DeploymentStatus::Unhealthy,
"Cancelling" => DeploymentStatus::Cancelling,
"Cancelled" => DeploymentStatus::Cancelled,
"Terminating" => DeploymentStatus::Terminating,
"Stopped" => DeploymentStatus::Stopped,
"Superseded" => DeploymentStatus::Superseded,
"Failed" => DeploymentStatus::Failed,
"Expired" => DeploymentStatus::Expired,
_ => panic!("Unknown status: {}", s),
}
}
#[sqlx::test]
async fn db_is_terminal_matches_rust_is_terminal(pool: PgPool) {
let statuses = vec![
("Pending", false),
("Building", false),
("Pushing", false),
("Pushed", false),
("Deploying", false),
("Healthy", false),
("Unhealthy", false),
("Cancelling", false),
("Terminating", false),
("Cancelled", true),
("Stopped", true),
("Superseded", true),
("Failed", true),
("Expired", true),
];
for (status_str, expected) in statuses {
let result: bool = sqlx::query_scalar("SELECT is_terminal($1)")
.bind(status_str)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
result, expected,
"is_terminal({}) returned {} but expected {}",
status_str, result, expected
);
let status = str_to_status(status_str);
assert_eq!(
state_machine::is_terminal(&status),
expected,
"Rust is_terminal mismatch for {}",
status_str
);
}
}
#[sqlx::test]
async fn db_is_cancellable_matches_rust_is_cancellable(pool: PgPool) {
let statuses = vec![
("Pending", true),
("Building", true),
("Pushing", true),
("Pushed", true),
("Deploying", true),
("Healthy", false),
("Unhealthy", false),
("Cancelling", false),
("Terminating", false),
("Cancelled", false),
("Stopped", false),
("Superseded", false),
("Failed", false),
("Expired", false),
];
for (status_str, expected) in statuses {
let result: bool = sqlx::query_scalar("SELECT is_cancellable($1)")
.bind(status_str)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
result, expected,
"is_cancellable({}) returned {} but expected {}",
status_str, result, expected
);
let status = str_to_status(status_str);
assert_eq!(
state_machine::is_cancellable(&status),
expected,
"Rust is_cancellable mismatch for {}",
status_str
);
}
}
#[sqlx::test]
async fn db_is_active_matches_rust_is_active(pool: PgPool) {
let statuses = vec![
("Pending", false),
("Building", false),
("Pushing", false),
("Pushed", false),
("Deploying", false),
("Healthy", true),
("Unhealthy", true),
("Cancelling", false),
("Terminating", false),
("Cancelled", false),
("Stopped", false),
("Superseded", false),
("Failed", false),
("Expired", false),
];
for (status_str, expected) in statuses {
let result: bool = sqlx::query_scalar("SELECT is_active($1)")
.bind(status_str)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
result, expected,
"is_active({}) returned {} but expected {}",
status_str, result, expected
);
let status = str_to_status(status_str);
assert_eq!(
state_machine::is_active(&status),
expected,
"Rust is_active mismatch for {}",
status_str
);
}
}
#[sqlx::test]
async fn db_is_protected_includes_terminal_and_cleanup(pool: PgPool) {
let statuses = vec![
("Pending", false),
("Building", false),
("Pushing", false),
("Pushed", false),
("Deploying", false),
("Healthy", false),
("Unhealthy", false),
("Cancelling", true), ("Terminating", true), ("Cancelled", true), ("Stopped", true), ("Superseded", true), ("Failed", true), ("Expired", true), ];
for (status_str, expected) in statuses {
let result: bool = sqlx::query_scalar("SELECT is_protected($1)")
.bind(status_str)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
result, expected,
"is_protected({}) returned {} but expected {}",
status_str, result, expected
);
}
}
#[sqlx::test]
async fn deploying_started_at_set_once_on_deploying_transition(pool: PgPool) {
use uuid::Uuid;
let project_id = Uuid::new_v4();
let user_id = Uuid::new_v4();
sqlx::query!(
"INSERT INTO users (id, email) VALUES ($1, $2)",
user_id,
"test@example.com"
)
.execute(&pool)
.await
.unwrap();
sqlx::query!(
"INSERT INTO projects (id, name, owner_user_id, access_class, status) VALUES ($1, $2, $3, $4, $5)",
project_id,
"test-project",
user_id,
"public",
"Stopped"
)
.execute(&pool)
.await
.unwrap();
let deployment = create(
&pool,
CreateDeploymentParams {
deployment_id: "test-deploy",
project_id,
created_by_id: user_id,
status: DeploymentStatus::Pushed,
image: None,
image_digest: None,
rolled_back_from_deployment_id: None,
deployment_group: "default",
expires_at: None,
http_port: 8080,
is_active: false,
},
)
.await
.unwrap();
assert!(deployment.deploying_started_at.is_none());
let deployment = update_status(&pool, deployment.id, DeploymentStatus::Deploying)
.await
.unwrap();
assert!(deployment.deploying_started_at.is_some());
let first_timestamp = deployment.deploying_started_at.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let deployment = update_status(&pool, deployment.id, DeploymentStatus::Deploying)
.await
.unwrap();
assert_eq!(deployment.deploying_started_at, Some(first_timestamp));
let deployment = update_status(&pool, deployment.id, DeploymentStatus::Healthy)
.await
.unwrap();
assert_eq!(deployment.deploying_started_at, Some(first_timestamp));
}
}