use runledger_core::jobs::WorkflowType;
use sqlx::types::Uuid;
use crate::{DbPool, DbTx, Result};
use super::super::row_decode::{
parse_job_stage, parse_job_type_name, parse_step_key_name, parse_workflow_release_mode,
parse_workflow_run_status, parse_workflow_step_execution_kind, parse_workflow_step_status,
parse_workflow_type_name,
};
use super::super::workflow_types::{
WorkflowRunDbRecord, WorkflowRunListFilter, WorkflowStepDbRecord,
WorkflowStepDependencyDbRecord,
};
pub async fn get_workflow_run_by_id(
pool: &DbPool,
organization_id: Option<Uuid>,
workflow_run_id: Uuid,
) -> Result<Option<WorkflowRunDbRecord>> {
let row = sqlx::query!(
"SELECT
id,
workflow_type,
organization_id,
status::text AS \"status!\",
idempotency_key,
metadata,
started_at,
finished_at,
created_at,
updated_at
FROM workflow_runs
WHERE id = $1
AND ($2::uuid IS NULL OR organization_id = $2)
LIMIT 1",
workflow_run_id,
organization_id,
)
.fetch_optional(pool)
.await
.map_err(|error| crate::Error::from_query_sqlx_with_context("get workflow run by id", error))?;
row.map(|row| {
Ok(WorkflowRunDbRecord {
id: row.id,
workflow_type: parse_workflow_type_name(row.workflow_type)?,
organization_id: row.organization_id,
status: parse_workflow_run_status(row.status)?,
idempotency_key: row.idempotency_key,
metadata: row.metadata,
started_at: row.started_at,
finished_at: row.finished_at,
created_at: row.created_at,
updated_at: row.updated_at,
})
})
.transpose()
}
pub async fn list_workflow_steps(
pool: &DbPool,
organization_id: Option<Uuid>,
workflow_run_id: Uuid,
) -> Result<Vec<WorkflowStepDbRecord>> {
let rows = sqlx::query!(
"SELECT
ws.id,
ws.workflow_run_id,
ws.step_key,
ws.execution_kind::text AS \"execution_kind!\",
ws.job_type,
ws.organization_id,
ws.payload,
ws.priority,
ws.max_attempts,
ws.timeout_seconds,
ws.stage,
ws.status::text AS \"status!\",
ws.job_id,
ws.released_at,
ws.started_at,
ws.finished_at,
ws.dependency_count_total,
ws.dependency_count_pending,
ws.dependency_count_unsatisfied,
ws.status_reason,
ws.last_error_code,
ws.last_error_message,
ws.created_at,
ws.updated_at
FROM workflow_steps ws
JOIN workflow_runs wr ON wr.id = ws.workflow_run_id
WHERE ws.workflow_run_id = $1
AND ($2::uuid IS NULL OR wr.organization_id = $2)
ORDER BY ws.created_at ASC",
workflow_run_id,
organization_id,
)
.fetch_all(pool)
.await
.map_err(|error| crate::Error::from_query_sqlx_with_context("list workflow steps", error))?;
rows.into_iter()
.map(|row| {
Ok(WorkflowStepDbRecord {
id: row.id,
workflow_run_id: row.workflow_run_id,
step_key: parse_step_key_name(row.step_key)?,
execution_kind: parse_workflow_step_execution_kind(row.execution_kind)?,
job_type: row.job_type.map(parse_job_type_name).transpose()?,
organization_id: row.organization_id,
payload: row.payload,
priority: row.priority,
max_attempts: row.max_attempts,
timeout_seconds: row.timeout_seconds,
stage: row.stage.map(parse_job_stage).transpose()?,
status: parse_workflow_step_status(row.status)?,
job_id: row.job_id,
released_at: row.released_at,
started_at: row.started_at,
finished_at: row.finished_at,
dependency_count_total: row.dependency_count_total,
dependency_count_pending: row.dependency_count_pending,
dependency_count_unsatisfied: row.dependency_count_unsatisfied,
status_reason: row.status_reason,
last_error_code: row.last_error_code,
last_error_message: row.last_error_message,
created_at: row.created_at,
updated_at: row.updated_at,
})
})
.collect()
}
pub async fn list_workflow_runs(
pool: &DbPool,
filter: &WorkflowRunListFilter<'_>,
) -> Result<Vec<WorkflowRunDbRecord>> {
let status_text = filter.status.map(|status| status.as_db_value());
let rows = sqlx::query!(
"SELECT
id,
workflow_type,
organization_id,
status::text AS \"status!\",
idempotency_key,
metadata,
started_at,
finished_at,
created_at,
updated_at
FROM workflow_runs
WHERE ($1::uuid IS NULL OR organization_id = $1)
AND ($2::text IS NULL OR status = $2::text::workflow_run_status)
AND ($3::text IS NULL OR workflow_type ILIKE '%' || $3 || '%')
ORDER BY created_at DESC
LIMIT $4 OFFSET $5",
filter.organization_id,
status_text,
filter.workflow_type,
filter.limit,
filter.offset,
)
.fetch_all(pool)
.await
.map_err(|error| crate::Error::from_query_sqlx_with_context("list workflow runs", error))?;
rows.into_iter()
.map(|row| {
Ok(WorkflowRunDbRecord {
id: row.id,
workflow_type: parse_workflow_type_name(row.workflow_type)?,
organization_id: row.organization_id,
status: parse_workflow_run_status(row.status)?,
idempotency_key: row.idempotency_key,
metadata: row.metadata,
started_at: row.started_at,
finished_at: row.finished_at,
created_at: row.created_at,
updated_at: row.updated_at,
})
})
.collect()
}
pub async fn get_latest_workflow_run_by_type(
pool: &DbPool,
organization_id: Option<Uuid>,
workflow_type: WorkflowType<'_>,
) -> Result<Option<WorkflowRunDbRecord>> {
let row = sqlx::query!(
"SELECT
id,
workflow_type,
organization_id,
status::text AS \"status!\",
idempotency_key,
metadata,
started_at,
finished_at,
created_at,
updated_at
FROM workflow_runs
WHERE ($1::uuid IS NULL OR organization_id = $1)
AND workflow_type = $2
ORDER BY created_at DESC
LIMIT 1",
organization_id,
workflow_type as _,
)
.fetch_optional(pool)
.await
.map_err(|error| {
crate::Error::from_query_sqlx_with_context("get latest workflow run by type", error)
})?;
let Some(row) = row else {
return Ok(None);
};
Ok(Some(WorkflowRunDbRecord {
id: row.id,
workflow_type: parse_workflow_type_name(row.workflow_type)?,
organization_id: row.organization_id,
status: parse_workflow_run_status(row.status)?,
idempotency_key: row.idempotency_key,
metadata: row.metadata,
started_at: row.started_at,
finished_at: row.finished_at,
created_at: row.created_at,
updated_at: row.updated_at,
}))
}
pub async fn list_workflow_step_dependencies(
pool: &DbPool,
organization_id: Option<Uuid>,
workflow_run_id: Uuid,
) -> Result<Vec<WorkflowStepDependencyDbRecord>> {
let rows = sqlx::query!(
"SELECT
wsd.workflow_run_id,
wsd.prerequisite_step_id,
wsd.dependent_step_id,
wsd.release_mode::text AS \"release_mode!\",
wsd.created_at
FROM workflow_step_dependencies wsd
JOIN workflow_runs wr ON wr.id = wsd.workflow_run_id
WHERE wsd.workflow_run_id = $1
AND ($2::uuid IS NULL OR wr.organization_id = $2)
ORDER BY
wsd.prerequisite_step_id ASC,
wsd.dependent_step_id ASC",
workflow_run_id,
organization_id,
)
.fetch_all(pool)
.await
.map_err(|error| {
crate::Error::from_query_sqlx_with_context("list workflow step dependencies", error)
})?;
rows.into_iter()
.map(|row| {
Ok(WorkflowStepDependencyDbRecord {
workflow_run_id: row.workflow_run_id,
prerequisite_step_id: row.prerequisite_step_id,
dependent_step_id: row.dependent_step_id,
release_mode: parse_workflow_release_mode(row.release_mode)?,
created_at: row.created_at,
})
})
.collect()
}
pub async fn get_workflow_run_id_for_job(pool: &DbPool, job_id: Uuid) -> Result<Option<Uuid>> {
sqlx::query_scalar!(
"SELECT ws.workflow_run_id FROM workflow_steps ws WHERE ws.job_id = $1",
job_id,
)
.fetch_optional(pool)
.await
.map_err(|error| {
crate::Error::from_query_sqlx_with_context("get workflow run id for job", error)
})
}
pub async fn get_workflow_run_by_type_and_idempotency_key(
pool: &DbPool,
organization_id: Option<Uuid>,
workflow_type: WorkflowType<'_>,
idempotency_key: &str,
) -> Result<Option<WorkflowRunDbRecord>> {
let mut tx = pool
.begin()
.await
.map_err(|error| crate::Error::ConnectionError(error.to_string()))?;
let run = get_workflow_run_by_type_and_idempotency_key_tx(
&mut tx,
organization_id,
workflow_type,
idempotency_key,
)
.await?;
tx.commit()
.await
.map_err(|error| crate::Error::ConnectionError(error.to_string()))?;
Ok(run)
}
pub async fn get_workflow_run_by_type_and_idempotency_key_tx(
tx: &mut DbTx<'_>,
organization_id: Option<Uuid>,
workflow_type: WorkflowType<'_>,
idempotency_key: &str,
) -> Result<Option<WorkflowRunDbRecord>> {
let row = sqlx::query!(
"SELECT
id,
workflow_type,
organization_id,
status::text AS \"status!\",
idempotency_key,
metadata,
started_at,
finished_at,
created_at,
updated_at
FROM workflow_runs
WHERE workflow_type = $1
AND idempotency_key = $2
AND ($3::uuid IS NULL OR organization_id = $3)
LIMIT 1",
workflow_type as _,
idempotency_key,
organization_id,
)
.fetch_optional(&mut **tx)
.await
.map_err(|error| {
crate::Error::from_query_sqlx_with_context(
"get workflow run by type and idempotency key",
error,
)
})?;
row.map(|row| {
Ok(WorkflowRunDbRecord {
id: row.id,
workflow_type: parse_workflow_type_name(row.workflow_type)?,
organization_id: row.organization_id,
status: parse_workflow_run_status(row.status)?,
idempotency_key: row.idempotency_key,
metadata: row.metadata,
started_at: row.started_at,
finished_at: row.finished_at,
created_at: row.created_at,
updated_at: row.updated_at,
})
})
.transpose()
}