use ff_core::engine_error::EngineError;
use sqlx::{PgPool, Row};
use crate::error::map_sqlx_error;
pub async fn promote_for_execution(
pool: &PgPool,
partition_key: i16,
exec_uuid: uuid::Uuid,
now_ms: i64,
) -> Result<(), EngineError> {
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
let row = sqlx::query(
r#"
SELECT lifecycle_phase, eligibility_state, deadline_at_ms
FROM ff_exec_core
WHERE partition_key = $1 AND execution_id = $2
FOR UPDATE
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else {
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(());
};
let phase: String = row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
let elig: String = row.try_get("eligibility_state").map_err(map_sqlx_error)?;
let delay_until: Option<i64> = row
.try_get::<Option<i64>, _>("deadline_at_ms")
.map_err(map_sqlx_error)?;
if phase != "runnable"
|| elig != "not_eligible_until_time"
|| !matches!(delay_until, Some(t) if t <= now_ms)
{
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(());
}
sqlx::query(
r#"
UPDATE ff_exec_core
SET eligibility_state = 'eligible_now',
deadline_at_ms = NULL
WHERE partition_key = $1 AND execution_id = $2
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(())
}