ff_backend_postgres/reconcilers/
attempt_timeout.rs1use ff_core::backend::ScannerFilter;
19use ff_core::engine_error::EngineError;
20use serde_json::Value as JsonValue;
21use sqlx::{PgPool, Row};
22
23use crate::error::map_sqlx_error;
24use crate::lease_event;
25use crate::reconcilers::ScanReport;
26
27const BATCH_SIZE: i64 = 1000;
29
30const GRACE_MS: i64 = 0;
32
33pub async fn scan_tick(
35 pool: &PgPool,
36 partition_key: i16,
37 filter: &ScannerFilter,
38) -> Result<ScanReport, EngineError> {
39 let now_ms: i64 = i64::try_from(
40 std::time::SystemTime::now()
41 .duration_since(std::time::UNIX_EPOCH)
42 .map(|d| d.as_millis())
43 .unwrap_or(0),
44 )
45 .unwrap_or(i64::MAX);
46 let cutoff = now_ms.saturating_sub(GRACE_MS);
47
48 let rows = sqlx::query(
49 r#"
50 SELECT a.execution_id, a.attempt_index
51 FROM ff_attempt a
52 JOIN ff_exec_core c
53 ON c.partition_key = a.partition_key
54 AND c.execution_id = a.execution_id
55 WHERE a.partition_key = $1
56 AND a.lease_expires_at_ms IS NOT NULL
57 AND a.lease_expires_at_ms < $2
58 AND c.lifecycle_phase = 'active'
59 ORDER BY a.lease_expires_at_ms ASC
60 LIMIT $3
61 "#,
62 )
63 .bind(partition_key)
64 .bind(cutoff)
65 .bind(BATCH_SIZE)
66 .fetch_all(pool)
67 .await
68 .map_err(map_sqlx_error)?;
69
70 let mut report = ScanReport::default();
71 for row in rows {
72 let exec_uuid: uuid::Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
73 let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
74 if skip_by_filter(pool, partition_key, exec_uuid, filter).await {
75 continue;
76 }
77 match expire_one(pool, partition_key, exec_uuid, attempt_index, now_ms).await {
78 Ok(()) => report.processed += 1,
79 Err(e) => {
80 tracing::warn!(
81 partition = partition_key,
82 execution = %exec_uuid,
83 attempt_index,
84 error = %e,
85 "attempt_timeout reconciler: row expiry failed",
86 );
87 report.errors += 1;
88 }
89 }
90 }
91 Ok(report)
92}
93
94async fn expire_one(
95 pool: &PgPool,
96 partition_key: i16,
97 exec_uuid: uuid::Uuid,
98 attempt_index: i32,
99 now_ms: i64,
100) -> Result<(), EngineError> {
101 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
102
103 let att_row = sqlx::query(
104 r#"
105 SELECT lease_expires_at_ms
106 FROM ff_attempt
107 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
108 FOR UPDATE
109 "#,
110 )
111 .bind(partition_key)
112 .bind(exec_uuid)
113 .bind(attempt_index)
114 .fetch_optional(&mut *tx)
115 .await
116 .map_err(map_sqlx_error)?;
117
118 let Some(att) = att_row else {
119 tx.rollback().await.map_err(map_sqlx_error)?;
120 return Ok(());
121 };
122 let expires_at: Option<i64> = att
123 .try_get::<Option<i64>, _>("lease_expires_at_ms")
124 .map_err(map_sqlx_error)?;
125 if !matches!(expires_at, Some(e) if e < now_ms) {
126 tx.rollback().await.map_err(map_sqlx_error)?;
127 return Ok(());
128 }
129
130 let core_row = sqlx::query(
131 r#"
132 SELECT attempt_index, lifecycle_phase, policy
133 FROM ff_exec_core
134 WHERE partition_key = $1 AND execution_id = $2
135 FOR UPDATE
136 "#,
137 )
138 .bind(partition_key)
139 .bind(exec_uuid)
140 .fetch_optional(&mut *tx)
141 .await
142 .map_err(map_sqlx_error)?;
143
144 let Some(core) = core_row else {
145 tx.rollback().await.map_err(map_sqlx_error)?;
146 return Ok(());
147 };
148 let phase: String = core.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
149 if phase != "active" {
150 tx.rollback().await.map_err(map_sqlx_error)?;
151 return Ok(());
152 }
153 let cur_attempt: i32 = core.try_get("attempt_index").map_err(map_sqlx_error)?;
154 let policy: Option<JsonValue> = core.try_get("policy").map_err(map_sqlx_error)?;
155 let max_retries = extract_max_retries(policy.as_ref());
156 let retries_remain = (cur_attempt as i64) < (max_retries as i64);
157
158 sqlx::query(
159 r#"
160 UPDATE ff_attempt
161 SET outcome = 'interrupted',
162 lease_expires_at_ms = NULL,
163 terminal_at_ms = $1
164 WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
165 "#,
166 )
167 .bind(now_ms)
168 .bind(partition_key)
169 .bind(exec_uuid)
170 .bind(attempt_index)
171 .execute(&mut *tx)
172 .await
173 .map_err(map_sqlx_error)?;
174
175 if retries_remain {
176 sqlx::query(
177 r#"
178 UPDATE ff_exec_core
179 SET lifecycle_phase = 'runnable',
180 ownership_state = 'unowned',
181 eligibility_state = 'eligible_now',
182 attempt_state = 'pending_retry_attempt',
183 attempt_index = attempt_index + 1,
184 raw_fields = raw_fields || jsonb_build_object(
185 'last_failure_message', 'attempt_timeout'
186 )
187 WHERE partition_key = $1 AND execution_id = $2
188 "#,
189 )
190 .bind(partition_key)
191 .bind(exec_uuid)
192 .execute(&mut *tx)
193 .await
194 .map_err(map_sqlx_error)?;
195 } else {
196 sqlx::query(
197 r#"
198 UPDATE ff_exec_core
199 SET lifecycle_phase = 'terminal',
200 ownership_state = 'unowned',
201 eligibility_state = 'not_applicable',
202 attempt_state = 'attempt_terminal',
203 terminal_at_ms = $1,
204 raw_fields = raw_fields || jsonb_build_object(
205 'last_failure_message', 'attempt_timeout'
206 )
207 WHERE partition_key = $2 AND execution_id = $3
208 "#,
209 )
210 .bind(now_ms)
211 .bind(partition_key)
212 .bind(exec_uuid)
213 .execute(&mut *tx)
214 .await
215 .map_err(map_sqlx_error)?;
216
217 sqlx::query(
218 r#"
219 INSERT INTO ff_completion_event (
220 partition_key, execution_id, flow_id, outcome,
221 namespace, instance_tag, occurred_at_ms
222 )
223 SELECT partition_key, execution_id, flow_id, 'expired',
224 NULL, NULL, $3
225 FROM ff_exec_core
226 WHERE partition_key = $1 AND execution_id = $2
227 "#,
228 )
229 .bind(partition_key)
230 .bind(exec_uuid)
231 .bind(now_ms)
232 .execute(&mut *tx)
233 .await
234 .map_err(map_sqlx_error)?;
235 }
236
237 lease_event::emit(
239 &mut tx,
240 partition_key,
241 exec_uuid,
242 None,
243 lease_event::EVENT_EXPIRED,
244 now_ms,
245 )
246 .await?;
247
248 tx.commit().await.map_err(map_sqlx_error)?;
249 Ok(())
250}
251
252fn extract_max_retries(policy: Option<&JsonValue>) -> u32 {
255 policy
256 .and_then(|v| v.get("retry_policy"))
257 .and_then(|rp| rp.get("max_retries"))
258 .and_then(|m| m.as_u64())
259 .and_then(|n| u32::try_from(n).ok())
260 .unwrap_or(0)
261}
262
263pub(crate) async fn skip_by_filter(
266 pool: &PgPool,
267 partition_key: i16,
268 exec_uuid: uuid::Uuid,
269 filter: &ScannerFilter,
270) -> bool {
271 if filter.is_noop() {
272 return false;
273 }
274 let row = sqlx::query(
275 r#"
276 SELECT raw_fields->>'namespace' AS ns,
277 raw_fields->'tags' AS tags
278 FROM ff_exec_core
279 WHERE partition_key = $1 AND execution_id = $2
280 "#,
281 )
282 .bind(partition_key)
283 .bind(exec_uuid)
284 .fetch_optional(pool)
285 .await;
286 let Ok(Some(row)) = row else {
287 return true;
288 };
289 if let Some(ref want_ns) = filter.namespace {
290 let got: Option<String> = row.try_get("ns").ok().flatten();
291 if got.as_deref() != Some(want_ns.as_str()) {
292 return true;
293 }
294 }
295 if let Some((ref k, ref v)) = filter.instance_tag {
296 let tags: Option<JsonValue> = row.try_get("tags").ok();
297 let got = tags
298 .as_ref()
299 .and_then(|t| t.get(k))
300 .and_then(|t| t.as_str());
301 if got != Some(v.as_str()) {
302 return true;
303 }
304 }
305 false
306}