use ff_core::engine_error::EngineError;
use serde_json::{Value as JsonValue, json};
use sqlx::{PgPool, Row};
use crate::error::map_sqlx_error;
pub async fn close_for_execution(
pool: &PgPool,
partition_key: i16,
waitpoint_uuid: uuid::Uuid,
now_ms: i64,
) -> Result<(), EngineError> {
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
let row = sqlx::query(
r#"
SELECT execution_id, waitpoint_key, expires_at_ms
FROM ff_waitpoint_pending
WHERE partition_key = $1 AND waitpoint_id = $2
FOR UPDATE
"#,
)
.bind(partition_key)
.bind(waitpoint_uuid)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else {
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(());
};
let exec_uuid: uuid::Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
let waitpoint_key: String = row.try_get("waitpoint_key").map_err(map_sqlx_error)?;
let expires_at: Option<i64> = row
.try_get::<Option<i64>, _>("expires_at_ms")
.map_err(map_sqlx_error)?;
if !matches!(expires_at, Some(t) if t < now_ms) {
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(());
}
sqlx::query(
r#"
DELETE FROM ff_waitpoint_pending
WHERE partition_key = $1 AND waitpoint_id = $2
"#,
)
.bind(partition_key)
.bind(waitpoint_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let susp_row = sqlx::query(
r#"
SELECT member_map
FROM ff_suspension_current
WHERE partition_key = $1 AND execution_id = $2
FOR UPDATE
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
if let Some(susp_row) = susp_row {
let member_map: JsonValue = susp_row.try_get("member_map").map_err(map_sqlx_error)?;
let mut map = match member_map {
JsonValue::Object(m) => m,
_ => serde_json::Map::new(),
};
let key = if waitpoint_key.is_empty() {
waitpoint_uuid.to_string()
} else {
waitpoint_key
};
map.insert(
key,
json!({
"status": "never_committed",
"closed_at_ms": now_ms,
"source": "pending_wp_expiry",
}),
);
sqlx::query(
r#"
UPDATE ff_suspension_current
SET member_map = $1
WHERE partition_key = $2 AND execution_id = $3
"#,
)
.bind(JsonValue::Object(map))
.bind(partition_key)
.bind(exec_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
}
tx.commit().await.map_err(map_sqlx_error)?;
Ok(())
}