ff_backend_postgres/reconcilers/
attempt_timeout.rs1use ff_core::backend::ScannerFilter;
19use ff_core::engine_error::EngineError;
20use serde_json::Value as JsonValue;
21use sqlx::{PgPool, Row};
22
23use crate::error::map_sqlx_error;
24use crate::reconcilers::ScanReport;
25
26const BATCH_SIZE: i64 = 1000;
28
29const GRACE_MS: i64 = 0;
31
32pub async fn scan_tick(
34 pool: &PgPool,
35 partition_key: i16,
36 filter: &ScannerFilter,
37) -> Result<ScanReport, EngineError> {
38 let now_ms: i64 = i64::try_from(
39 std::time::SystemTime::now()
40 .duration_since(std::time::UNIX_EPOCH)
41 .map(|d| d.as_millis())
42 .unwrap_or(0),
43 )
44 .unwrap_or(i64::MAX);
45 let cutoff = now_ms.saturating_sub(GRACE_MS);
46
47 let rows = sqlx::query(
48 r#"
49 SELECT a.execution_id, a.attempt_index
50 FROM ff_attempt a
51 JOIN ff_exec_core c
52 ON c.partition_key = a.partition_key
53 AND c.execution_id = a.execution_id
54 WHERE a.partition_key = $1
55 AND a.lease_expires_at_ms IS NOT NULL
56 AND a.lease_expires_at_ms < $2
57 AND c.lifecycle_phase = 'active'
58 ORDER BY a.lease_expires_at_ms ASC
59 LIMIT $3
60 "#,
61 )
62 .bind(partition_key)
63 .bind(cutoff)
64 .bind(BATCH_SIZE)
65 .fetch_all(pool)
66 .await
67 .map_err(map_sqlx_error)?;
68
69 let mut report = ScanReport::default();
70 for row in rows {
71 let exec_uuid: uuid::Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
72 let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
73 if skip_by_filter(pool, partition_key, exec_uuid, filter).await {
74 continue;
75 }
76 match expire_one(pool, partition_key, exec_uuid, attempt_index, now_ms).await {
77 Ok(()) => report.processed += 1,
78 Err(e) => {
79 tracing::warn!(
80 partition = partition_key,
81 execution = %exec_uuid,
82 attempt_index,
83 error = %e,
84 "attempt_timeout reconciler: row expiry failed",
85 );
86 report.errors += 1;
87 }
88 }
89 }
90 Ok(report)
91}
92
93async fn expire_one(
94 pool: &PgPool,
95 partition_key: i16,
96 exec_uuid: uuid::Uuid,
97 attempt_index: i32,
98 now_ms: i64,
99) -> Result<(), EngineError> {
100 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
101
102 let att_row = sqlx::query(
103 r#"
104 SELECT lease_expires_at_ms
105 FROM ff_attempt
106 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
107 FOR UPDATE
108 "#,
109 )
110 .bind(partition_key)
111 .bind(exec_uuid)
112 .bind(attempt_index)
113 .fetch_optional(&mut *tx)
114 .await
115 .map_err(map_sqlx_error)?;
116
117 let Some(att) = att_row else {
118 tx.rollback().await.map_err(map_sqlx_error)?;
119 return Ok(());
120 };
121 let expires_at: Option<i64> = att
122 .try_get::<Option<i64>, _>("lease_expires_at_ms")
123 .map_err(map_sqlx_error)?;
124 if !matches!(expires_at, Some(e) if e < now_ms) {
125 tx.rollback().await.map_err(map_sqlx_error)?;
126 return Ok(());
127 }
128
129 let core_row = sqlx::query(
130 r#"
131 SELECT attempt_index, lifecycle_phase, policy
132 FROM ff_exec_core
133 WHERE partition_key = $1 AND execution_id = $2
134 FOR UPDATE
135 "#,
136 )
137 .bind(partition_key)
138 .bind(exec_uuid)
139 .fetch_optional(&mut *tx)
140 .await
141 .map_err(map_sqlx_error)?;
142
143 let Some(core) = core_row else {
144 tx.rollback().await.map_err(map_sqlx_error)?;
145 return Ok(());
146 };
147 let phase: String = core.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
148 if phase != "active" {
149 tx.rollback().await.map_err(map_sqlx_error)?;
150 return Ok(());
151 }
152 let cur_attempt: i32 = core.try_get("attempt_index").map_err(map_sqlx_error)?;
153 let policy: Option<JsonValue> = core.try_get("policy").map_err(map_sqlx_error)?;
154 let max_retries = extract_max_retries(policy.as_ref());
155 let retries_remain = (cur_attempt as i64) < (max_retries as i64);
156
157 sqlx::query(
158 r#"
159 UPDATE ff_attempt
160 SET outcome = 'interrupted',
161 lease_expires_at_ms = NULL,
162 terminal_at_ms = $1
163 WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
164 "#,
165 )
166 .bind(now_ms)
167 .bind(partition_key)
168 .bind(exec_uuid)
169 .bind(attempt_index)
170 .execute(&mut *tx)
171 .await
172 .map_err(map_sqlx_error)?;
173
174 if retries_remain {
175 sqlx::query(
176 r#"
177 UPDATE ff_exec_core
178 SET lifecycle_phase = 'runnable',
179 ownership_state = 'unowned',
180 eligibility_state = 'eligible_now',
181 attempt_state = 'pending_retry_attempt',
182 attempt_index = attempt_index + 1,
183 raw_fields = raw_fields || jsonb_build_object(
184 'last_failure_message', 'attempt_timeout'
185 )
186 WHERE partition_key = $1 AND execution_id = $2
187 "#,
188 )
189 .bind(partition_key)
190 .bind(exec_uuid)
191 .execute(&mut *tx)
192 .await
193 .map_err(map_sqlx_error)?;
194 } else {
195 sqlx::query(
196 r#"
197 UPDATE ff_exec_core
198 SET lifecycle_phase = 'terminal',
199 ownership_state = 'unowned',
200 eligibility_state = 'not_applicable',
201 attempt_state = 'attempt_terminal',
202 terminal_at_ms = $1,
203 raw_fields = raw_fields || jsonb_build_object(
204 'last_failure_message', 'attempt_timeout'
205 )
206 WHERE partition_key = $2 AND execution_id = $3
207 "#,
208 )
209 .bind(now_ms)
210 .bind(partition_key)
211 .bind(exec_uuid)
212 .execute(&mut *tx)
213 .await
214 .map_err(map_sqlx_error)?;
215
216 sqlx::query(
217 r#"
218 INSERT INTO ff_completion_event (
219 partition_key, execution_id, flow_id, outcome,
220 namespace, instance_tag, occurred_at_ms
221 )
222 SELECT partition_key, execution_id, flow_id, 'expired',
223 NULL, NULL, $3
224 FROM ff_exec_core
225 WHERE partition_key = $1 AND execution_id = $2
226 "#,
227 )
228 .bind(partition_key)
229 .bind(exec_uuid)
230 .bind(now_ms)
231 .execute(&mut *tx)
232 .await
233 .map_err(map_sqlx_error)?;
234 }
235
236 tx.commit().await.map_err(map_sqlx_error)?;
237 Ok(())
238}
239
240fn extract_max_retries(policy: Option<&JsonValue>) -> u32 {
243 policy
244 .and_then(|v| v.get("retry_policy"))
245 .and_then(|rp| rp.get("max_retries"))
246 .and_then(|m| m.as_u64())
247 .and_then(|n| u32::try_from(n).ok())
248 .unwrap_or(0)
249}
250
251pub(crate) async fn skip_by_filter(
254 pool: &PgPool,
255 partition_key: i16,
256 exec_uuid: uuid::Uuid,
257 filter: &ScannerFilter,
258) -> bool {
259 if filter.is_noop() {
260 return false;
261 }
262 let row = sqlx::query(
263 r#"
264 SELECT raw_fields->>'namespace' AS ns,
265 raw_fields->'tags' AS tags
266 FROM ff_exec_core
267 WHERE partition_key = $1 AND execution_id = $2
268 "#,
269 )
270 .bind(partition_key)
271 .bind(exec_uuid)
272 .fetch_optional(pool)
273 .await;
274 let Ok(Some(row)) = row else {
275 return true;
276 };
277 if let Some(ref want_ns) = filter.namespace {
278 let got: Option<String> = row.try_get("ns").ok().flatten();
279 if got.as_deref() != Some(want_ns.as_str()) {
280 return true;
281 }
282 }
283 if let Some((ref k, ref v)) = filter.instance_tag {
284 let tags: Option<JsonValue> = row.try_get("tags").ok();
285 let got = tags
286 .as_ref()
287 .and_then(|t| t.get(k))
288 .and_then(|t| t.as_str());
289 if got != Some(v.as_str()) {
290 return true;
291 }
292 }
293 false
294}