Skip to main content

ff_backend_postgres/reconcilers/
attempt_timeout.rs

1//! Postgres `attempt_timeout` reconciler (wave 6c).
2//!
3//! Mirrors [`ff_engine::scanner::attempt_timeout`] against the Postgres
4//! schema. The Valkey side scans `ff:idx:{p:N}:attempt_timeout` ZSET
5//! for attempts whose lease timeout has lapsed and fires
6//! `FCALL ff_expire_execution`. On Postgres the authority is the
7//! `ff_attempt.lease_expires_at_ms` column (partial index
8//! `ff_attempt_lease_expiry_idx`); an attempt is "timed out" iff its
9//! lease has expired AND the `ff_exec_core.lifecycle_phase = 'active'`
10//! (a released lease — `lease_expires_at_ms IS NULL` — is not a
11//! timeout; a terminal attempt is also not a timeout).
12//!
13//! Per-row tx: each timeout action (mark attempt interrupted + transition
14//! exec_core to either `runnable` (retries remain) or `terminal`
15//! (exhausted) + emit completion event on terminal) is its own
16//! `BEGIN/COMMIT`, per spec §Constraints.
17
18use ff_core::backend::ScannerFilter;
19use ff_core::engine_error::EngineError;
20use serde_json::Value as JsonValue;
21use sqlx::{PgPool, Row};
22
23use crate::error::map_sqlx_error;
24use crate::lease_event;
25use crate::reconcilers::ScanReport;
26
27/// Max rows pulled per scan tick.
28const BATCH_SIZE: i64 = 1000;
29
30/// Grace period (ms). 0 by default.
31const GRACE_MS: i64 = 0;
32
33/// Scan one partition for timed-out attempt leases.
34pub async fn scan_tick(
35    pool: &PgPool,
36    partition_key: i16,
37    filter: &ScannerFilter,
38) -> Result<ScanReport, EngineError> {
39    let now_ms: i64 = i64::try_from(
40        std::time::SystemTime::now()
41            .duration_since(std::time::UNIX_EPOCH)
42            .map(|d| d.as_millis())
43            .unwrap_or(0),
44    )
45    .unwrap_or(i64::MAX);
46    let cutoff = now_ms.saturating_sub(GRACE_MS);
47
48    let rows = sqlx::query(
49        r#"
50        SELECT a.execution_id, a.attempt_index
51          FROM ff_attempt a
52          JOIN ff_exec_core c
53            ON c.partition_key = a.partition_key
54           AND c.execution_id = a.execution_id
55         WHERE a.partition_key = $1
56           AND a.lease_expires_at_ms IS NOT NULL
57           AND a.lease_expires_at_ms < $2
58           AND c.lifecycle_phase = 'active'
59         ORDER BY a.lease_expires_at_ms ASC
60         LIMIT $3
61        "#,
62    )
63    .bind(partition_key)
64    .bind(cutoff)
65    .bind(BATCH_SIZE)
66    .fetch_all(pool)
67    .await
68    .map_err(map_sqlx_error)?;
69
70    let mut report = ScanReport::default();
71    for row in rows {
72        let exec_uuid: uuid::Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
73        let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
74        if skip_by_filter(pool, partition_key, exec_uuid, filter).await {
75            continue;
76        }
77        match expire_one(pool, partition_key, exec_uuid, attempt_index, now_ms).await {
78            Ok(()) => report.processed += 1,
79            Err(e) => {
80                tracing::warn!(
81                    partition = partition_key,
82                    execution = %exec_uuid,
83                    attempt_index,
84                    error = %e,
85                    "attempt_timeout reconciler: row expiry failed",
86                );
87                report.errors += 1;
88            }
89        }
90    }
91    Ok(report)
92}
93
94async fn expire_one(
95    pool: &PgPool,
96    partition_key: i16,
97    exec_uuid: uuid::Uuid,
98    attempt_index: i32,
99    now_ms: i64,
100) -> Result<(), EngineError> {
101    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
102
103    let att_row = sqlx::query(
104        r#"
105        SELECT lease_expires_at_ms
106          FROM ff_attempt
107         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
108         FOR UPDATE
109        "#,
110    )
111    .bind(partition_key)
112    .bind(exec_uuid)
113    .bind(attempt_index)
114    .fetch_optional(&mut *tx)
115    .await
116    .map_err(map_sqlx_error)?;
117
118    let Some(att) = att_row else {
119        tx.rollback().await.map_err(map_sqlx_error)?;
120        return Ok(());
121    };
122    let expires_at: Option<i64> = att
123        .try_get::<Option<i64>, _>("lease_expires_at_ms")
124        .map_err(map_sqlx_error)?;
125    if !matches!(expires_at, Some(e) if e < now_ms) {
126        tx.rollback().await.map_err(map_sqlx_error)?;
127        return Ok(());
128    }
129
130    let core_row = sqlx::query(
131        r#"
132        SELECT attempt_index, lifecycle_phase, policy
133          FROM ff_exec_core
134         WHERE partition_key = $1 AND execution_id = $2
135         FOR UPDATE
136        "#,
137    )
138    .bind(partition_key)
139    .bind(exec_uuid)
140    .fetch_optional(&mut *tx)
141    .await
142    .map_err(map_sqlx_error)?;
143
144    let Some(core) = core_row else {
145        tx.rollback().await.map_err(map_sqlx_error)?;
146        return Ok(());
147    };
148    let phase: String = core.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
149    if phase != "active" {
150        tx.rollback().await.map_err(map_sqlx_error)?;
151        return Ok(());
152    }
153    let cur_attempt: i32 = core.try_get("attempt_index").map_err(map_sqlx_error)?;
154    let policy: Option<JsonValue> = core.try_get("policy").map_err(map_sqlx_error)?;
155    let max_retries = extract_max_retries(policy.as_ref());
156    let retries_remain = (cur_attempt as i64) < (max_retries as i64);
157
158    sqlx::query(
159        r#"
160        UPDATE ff_attempt
161           SET outcome = 'interrupted',
162               lease_expires_at_ms = NULL,
163               terminal_at_ms = $1
164         WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
165        "#,
166    )
167    .bind(now_ms)
168    .bind(partition_key)
169    .bind(exec_uuid)
170    .bind(attempt_index)
171    .execute(&mut *tx)
172    .await
173    .map_err(map_sqlx_error)?;
174
175    if retries_remain {
176        sqlx::query(
177            r#"
178            UPDATE ff_exec_core
179               SET lifecycle_phase = 'runnable',
180                   ownership_state = 'unowned',
181                   eligibility_state = 'eligible_now',
182                   attempt_state = 'pending_retry_attempt',
183                   attempt_index = attempt_index + 1,
184                   raw_fields = raw_fields || jsonb_build_object(
185                       'last_failure_message', 'attempt_timeout'
186                   )
187             WHERE partition_key = $1 AND execution_id = $2
188            "#,
189        )
190        .bind(partition_key)
191        .bind(exec_uuid)
192        .execute(&mut *tx)
193        .await
194        .map_err(map_sqlx_error)?;
195    } else {
196        sqlx::query(
197            r#"
198            UPDATE ff_exec_core
199               SET lifecycle_phase = 'terminal',
200                   ownership_state = 'unowned',
201                   eligibility_state = 'not_applicable',
202                   attempt_state = 'attempt_terminal',
203                   terminal_at_ms = $1,
204                   raw_fields = raw_fields || jsonb_build_object(
205                       'last_failure_message', 'attempt_timeout'
206                   )
207             WHERE partition_key = $2 AND execution_id = $3
208            "#,
209        )
210        .bind(now_ms)
211        .bind(partition_key)
212        .bind(exec_uuid)
213        .execute(&mut *tx)
214        .await
215        .map_err(map_sqlx_error)?;
216
217        sqlx::query(
218            r#"
219            INSERT INTO ff_completion_event (
220                partition_key, execution_id, flow_id, outcome,
221                namespace, instance_tag, occurred_at_ms
222            )
223            SELECT partition_key, execution_id, flow_id, 'expired',
224                   NULL, NULL, $3
225              FROM ff_exec_core
226             WHERE partition_key = $1 AND execution_id = $2
227            "#,
228        )
229        .bind(partition_key)
230        .bind(exec_uuid)
231        .bind(now_ms)
232        .execute(&mut *tx)
233        .await
234        .map_err(map_sqlx_error)?;
235    }
236
237    // RFC-019 Stage B outbox: lease expired (attempt_timeout reconciler).
238    lease_event::emit(
239        &mut tx,
240        partition_key,
241        exec_uuid,
242        None,
243        lease_event::EVENT_EXPIRED,
244        now_ms,
245    )
246    .await?;
247
248    tx.commit().await.map_err(map_sqlx_error)?;
249    Ok(())
250}
251
252/// Read `policy.retry_policy.max_retries` out of exec_core `policy`
253/// jsonb. Defaults to 0 (no retries) when absent.
254fn extract_max_retries(policy: Option<&JsonValue>) -> u32 {
255    policy
256        .and_then(|v| v.get("retry_policy"))
257        .and_then(|rp| rp.get("max_retries"))
258        .and_then(|m| m.as_u64())
259        .and_then(|n| u32::try_from(n).ok())
260        .unwrap_or(0)
261}
262
263/// Apply [`ScannerFilter`] dimensions against `ff_exec_core.raw_fields`.
264/// Returns `true` when the candidate should be skipped.
265pub(crate) async fn skip_by_filter(
266    pool: &PgPool,
267    partition_key: i16,
268    exec_uuid: uuid::Uuid,
269    filter: &ScannerFilter,
270) -> bool {
271    if filter.is_noop() {
272        return false;
273    }
274    let row = sqlx::query(
275        r#"
276        SELECT raw_fields->>'namespace' AS ns,
277               raw_fields->'tags'       AS tags
278          FROM ff_exec_core
279         WHERE partition_key = $1 AND execution_id = $2
280        "#,
281    )
282    .bind(partition_key)
283    .bind(exec_uuid)
284    .fetch_optional(pool)
285    .await;
286    let Ok(Some(row)) = row else {
287        return true;
288    };
289    if let Some(ref want_ns) = filter.namespace {
290        let got: Option<String> = row.try_get("ns").ok().flatten();
291        if got.as_deref() != Some(want_ns.as_str()) {
292            return true;
293        }
294    }
295    if let Some((ref k, ref v)) = filter.instance_tag {
296        let tags: Option<JsonValue> = row.try_get("tags").ok();
297        let got = tags
298            .as_ref()
299            .and_then(|t| t.get(k))
300            .and_then(|t| t.as_str());
301        if got != Some(v.as_str()) {
302            return true;
303        }
304    }
305    false
306}