ff_backend_postgres/reconcilers/
execution_deadline.rs1use ff_core::engine_error::EngineError;
20use sqlx::{PgPool, Row};
21
22use crate::error::map_sqlx_error;
23use crate::lease_event;
24
25pub async fn expire_for_execution(
32 pool: &PgPool,
33 partition_key: i16,
34 exec_uuid: uuid::Uuid,
35 now_ms: i64,
36) -> Result<(), EngineError> {
37 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
38
39 let core_row = sqlx::query(
40 r#"
41 SELECT attempt_index, lifecycle_phase, deadline_at_ms
42 FROM ff_exec_core
43 WHERE partition_key = $1 AND execution_id = $2
44 FOR UPDATE
45 "#,
46 )
47 .bind(partition_key)
48 .bind(exec_uuid)
49 .fetch_optional(&mut *tx)
50 .await
51 .map_err(map_sqlx_error)?;
52
53 let Some(core) = core_row else {
54 tx.rollback().await.map_err(map_sqlx_error)?;
55 return Ok(());
56 };
57 let phase: String = core.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
58 let deadline_at: Option<i64> = core
59 .try_get::<Option<i64>, _>("deadline_at_ms")
60 .map_err(map_sqlx_error)?;
61 if phase != "active" || !matches!(deadline_at, Some(d) if d < now_ms) {
62 tx.rollback().await.map_err(map_sqlx_error)?;
63 return Ok(());
64 }
65 let attempt_index: i32 = core.try_get("attempt_index").map_err(map_sqlx_error)?;
66
67 sqlx::query(
72 r#"
73 UPDATE ff_attempt
74 SET outcome = 'interrupted',
75 lease_expires_at_ms = NULL,
76 terminal_at_ms = $1
77 WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
78 "#,
79 )
80 .bind(now_ms)
81 .bind(partition_key)
82 .bind(exec_uuid)
83 .bind(attempt_index)
84 .execute(&mut *tx)
85 .await
86 .map_err(map_sqlx_error)?;
87
88 sqlx::query(
89 r#"
90 UPDATE ff_exec_core
91 SET lifecycle_phase = 'terminal',
92 ownership_state = 'unowned',
93 eligibility_state = 'not_applicable',
94 attempt_state = 'attempt_terminal',
95 terminal_at_ms = $1,
96 raw_fields = raw_fields || jsonb_build_object(
97 'last_failure_message', 'execution_deadline'
98 )
99 WHERE partition_key = $2 AND execution_id = $3
100 "#,
101 )
102 .bind(now_ms)
103 .bind(partition_key)
104 .bind(exec_uuid)
105 .execute(&mut *tx)
106 .await
107 .map_err(map_sqlx_error)?;
108
109 sqlx::query(
110 r#"
111 INSERT INTO ff_completion_event (
112 partition_key, execution_id, flow_id, outcome,
113 namespace, instance_tag, occurred_at_ms
114 )
115 SELECT partition_key, execution_id, flow_id, 'expired',
116 NULL, NULL, $3
117 FROM ff_exec_core
118 WHERE partition_key = $1 AND execution_id = $2
119 "#,
120 )
121 .bind(partition_key)
122 .bind(exec_uuid)
123 .bind(now_ms)
124 .execute(&mut *tx)
125 .await
126 .map_err(map_sqlx_error)?;
127
128 lease_event::emit(
131 &mut tx,
132 partition_key,
133 exec_uuid,
134 None,
135 lease_event::EVENT_EXPIRED,
136 now_ms,
137 )
138 .await?;
139
140 tx.commit().await.map_err(map_sqlx_error)?;
141 Ok(())
142}