Skip to main content

ff_backend_postgres/reconcilers/
attempt_timeout.rs

1//! Postgres `attempt_timeout` reconciler (wave 6c).
2//!
3//! Mirrors [`ff_engine::scanner::attempt_timeout`] against the Postgres
4//! schema. The Valkey side scans `ff:idx:{p:N}:attempt_timeout` ZSET
5//! for attempts whose lease timeout has lapsed and fires
6//! `FCALL ff_expire_execution`. On Postgres the authority is the
7//! `ff_attempt.lease_expires_at_ms` column (partial index
8//! `ff_attempt_lease_expiry_idx`); an attempt is "timed out" iff its
9//! lease has expired AND the `ff_exec_core.lifecycle_phase = 'active'`
10//! (a released lease — `lease_expires_at_ms IS NULL` — is not a
11//! timeout; a terminal attempt is also not a timeout).
12//!
13//! Per-row tx: each timeout action (mark attempt interrupted + transition
14//! exec_core to either `runnable` (retries remain) or `terminal`
15//! (exhausted) + emit completion event on terminal) is its own
16//! `BEGIN/COMMIT`, per spec §Constraints.
17
18use ff_core::backend::ScannerFilter;
19use ff_core::engine_error::EngineError;
20use serde_json::Value as JsonValue;
21use sqlx::{PgPool, Row};
22
23use crate::error::map_sqlx_error;
24use crate::reconcilers::ScanReport;
25
26/// Max rows pulled per scan tick.
27const BATCH_SIZE: i64 = 1000;
28
29/// Grace period (ms). 0 by default.
30const GRACE_MS: i64 = 0;
31
32/// Scan one partition for timed-out attempt leases.
33pub async fn scan_tick(
34    pool: &PgPool,
35    partition_key: i16,
36    filter: &ScannerFilter,
37) -> Result<ScanReport, EngineError> {
38    let now_ms: i64 = i64::try_from(
39        std::time::SystemTime::now()
40            .duration_since(std::time::UNIX_EPOCH)
41            .map(|d| d.as_millis())
42            .unwrap_or(0),
43    )
44    .unwrap_or(i64::MAX);
45    let cutoff = now_ms.saturating_sub(GRACE_MS);
46
47    let rows = sqlx::query(
48        r#"
49        SELECT a.execution_id, a.attempt_index
50          FROM ff_attempt a
51          JOIN ff_exec_core c
52            ON c.partition_key = a.partition_key
53           AND c.execution_id = a.execution_id
54         WHERE a.partition_key = $1
55           AND a.lease_expires_at_ms IS NOT NULL
56           AND a.lease_expires_at_ms < $2
57           AND c.lifecycle_phase = 'active'
58         ORDER BY a.lease_expires_at_ms ASC
59         LIMIT $3
60        "#,
61    )
62    .bind(partition_key)
63    .bind(cutoff)
64    .bind(BATCH_SIZE)
65    .fetch_all(pool)
66    .await
67    .map_err(map_sqlx_error)?;
68
69    let mut report = ScanReport::default();
70    for row in rows {
71        let exec_uuid: uuid::Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
72        let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
73        if skip_by_filter(pool, partition_key, exec_uuid, filter).await {
74            continue;
75        }
76        match expire_one(pool, partition_key, exec_uuid, attempt_index, now_ms).await {
77            Ok(()) => report.processed += 1,
78            Err(e) => {
79                tracing::warn!(
80                    partition = partition_key,
81                    execution = %exec_uuid,
82                    attempt_index,
83                    error = %e,
84                    "attempt_timeout reconciler: row expiry failed",
85                );
86                report.errors += 1;
87            }
88        }
89    }
90    Ok(report)
91}
92
93async fn expire_one(
94    pool: &PgPool,
95    partition_key: i16,
96    exec_uuid: uuid::Uuid,
97    attempt_index: i32,
98    now_ms: i64,
99) -> Result<(), EngineError> {
100    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
101
102    let att_row = sqlx::query(
103        r#"
104        SELECT lease_expires_at_ms
105          FROM ff_attempt
106         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
107         FOR UPDATE
108        "#,
109    )
110    .bind(partition_key)
111    .bind(exec_uuid)
112    .bind(attempt_index)
113    .fetch_optional(&mut *tx)
114    .await
115    .map_err(map_sqlx_error)?;
116
117    let Some(att) = att_row else {
118        tx.rollback().await.map_err(map_sqlx_error)?;
119        return Ok(());
120    };
121    let expires_at: Option<i64> = att
122        .try_get::<Option<i64>, _>("lease_expires_at_ms")
123        .map_err(map_sqlx_error)?;
124    if !matches!(expires_at, Some(e) if e < now_ms) {
125        tx.rollback().await.map_err(map_sqlx_error)?;
126        return Ok(());
127    }
128
129    let core_row = sqlx::query(
130        r#"
131        SELECT attempt_index, lifecycle_phase, policy
132          FROM ff_exec_core
133         WHERE partition_key = $1 AND execution_id = $2
134         FOR UPDATE
135        "#,
136    )
137    .bind(partition_key)
138    .bind(exec_uuid)
139    .fetch_optional(&mut *tx)
140    .await
141    .map_err(map_sqlx_error)?;
142
143    let Some(core) = core_row else {
144        tx.rollback().await.map_err(map_sqlx_error)?;
145        return Ok(());
146    };
147    let phase: String = core.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
148    if phase != "active" {
149        tx.rollback().await.map_err(map_sqlx_error)?;
150        return Ok(());
151    }
152    let cur_attempt: i32 = core.try_get("attempt_index").map_err(map_sqlx_error)?;
153    let policy: Option<JsonValue> = core.try_get("policy").map_err(map_sqlx_error)?;
154    let max_retries = extract_max_retries(policy.as_ref());
155    let retries_remain = (cur_attempt as i64) < (max_retries as i64);
156
157    sqlx::query(
158        r#"
159        UPDATE ff_attempt
160           SET outcome = 'interrupted',
161               lease_expires_at_ms = NULL,
162               terminal_at_ms = $1
163         WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
164        "#,
165    )
166    .bind(now_ms)
167    .bind(partition_key)
168    .bind(exec_uuid)
169    .bind(attempt_index)
170    .execute(&mut *tx)
171    .await
172    .map_err(map_sqlx_error)?;
173
174    if retries_remain {
175        sqlx::query(
176            r#"
177            UPDATE ff_exec_core
178               SET lifecycle_phase = 'runnable',
179                   ownership_state = 'unowned',
180                   eligibility_state = 'eligible_now',
181                   attempt_state = 'pending_retry_attempt',
182                   attempt_index = attempt_index + 1,
183                   raw_fields = raw_fields || jsonb_build_object(
184                       'last_failure_message', 'attempt_timeout'
185                   )
186             WHERE partition_key = $1 AND execution_id = $2
187            "#,
188        )
189        .bind(partition_key)
190        .bind(exec_uuid)
191        .execute(&mut *tx)
192        .await
193        .map_err(map_sqlx_error)?;
194    } else {
195        sqlx::query(
196            r#"
197            UPDATE ff_exec_core
198               SET lifecycle_phase = 'terminal',
199                   ownership_state = 'unowned',
200                   eligibility_state = 'not_applicable',
201                   attempt_state = 'attempt_terminal',
202                   terminal_at_ms = $1,
203                   raw_fields = raw_fields || jsonb_build_object(
204                       'last_failure_message', 'attempt_timeout'
205                   )
206             WHERE partition_key = $2 AND execution_id = $3
207            "#,
208        )
209        .bind(now_ms)
210        .bind(partition_key)
211        .bind(exec_uuid)
212        .execute(&mut *tx)
213        .await
214        .map_err(map_sqlx_error)?;
215
216        sqlx::query(
217            r#"
218            INSERT INTO ff_completion_event (
219                partition_key, execution_id, flow_id, outcome,
220                namespace, instance_tag, occurred_at_ms
221            )
222            SELECT partition_key, execution_id, flow_id, 'expired',
223                   NULL, NULL, $3
224              FROM ff_exec_core
225             WHERE partition_key = $1 AND execution_id = $2
226            "#,
227        )
228        .bind(partition_key)
229        .bind(exec_uuid)
230        .bind(now_ms)
231        .execute(&mut *tx)
232        .await
233        .map_err(map_sqlx_error)?;
234    }
235
236    tx.commit().await.map_err(map_sqlx_error)?;
237    Ok(())
238}
239
240/// Read `policy.retry_policy.max_retries` out of exec_core `policy`
241/// jsonb. Defaults to 0 (no retries) when absent.
242fn extract_max_retries(policy: Option<&JsonValue>) -> u32 {
243    policy
244        .and_then(|v| v.get("retry_policy"))
245        .and_then(|rp| rp.get("max_retries"))
246        .and_then(|m| m.as_u64())
247        .and_then(|n| u32::try_from(n).ok())
248        .unwrap_or(0)
249}
250
251/// Apply [`ScannerFilter`] dimensions against `ff_exec_core.raw_fields`.
252/// Returns `true` when the candidate should be skipped.
253pub(crate) async fn skip_by_filter(
254    pool: &PgPool,
255    partition_key: i16,
256    exec_uuid: uuid::Uuid,
257    filter: &ScannerFilter,
258) -> bool {
259    if filter.is_noop() {
260        return false;
261    }
262    let row = sqlx::query(
263        r#"
264        SELECT raw_fields->>'namespace' AS ns,
265               raw_fields->'tags'       AS tags
266          FROM ff_exec_core
267         WHERE partition_key = $1 AND execution_id = $2
268        "#,
269    )
270    .bind(partition_key)
271    .bind(exec_uuid)
272    .fetch_optional(pool)
273    .await;
274    let Ok(Some(row)) = row else {
275        return true;
276    };
277    if let Some(ref want_ns) = filter.namespace {
278        let got: Option<String> = row.try_get("ns").ok().flatten();
279        if got.as_deref() != Some(want_ns.as_str()) {
280            return true;
281        }
282    }
283    if let Some((ref k, ref v)) = filter.instance_tag {
284        let tags: Option<JsonValue> = row.try_get("tags").ok();
285        let got = tags
286            .as_ref()
287            .and_then(|t| t.get(k))
288            .and_then(|t| t.as_str());
289        if got != Some(v.as_str()) {
290            return true;
291        }
292    }
293    false
294}