Skip to main content

ff_backend_postgres/reconcilers/
execution_deadline.rs

1//! Postgres `execution_deadline` reconciler (RFC-020 Wave 9 minimal).
2//!
3//! Mirrors the `ExecutionDeadline` phase of
4//! [`ff_engine::scanner::execution_deadline`] — terminates executions
5//! whose whole-execution wall-clock deadline has elapsed. Shares a
6//! trait entry point (`expire_execution`) with `attempt_timeout`;
7//! discriminated by [`ff_core::engine_backend::ExpirePhase`].
8//!
9//! Candidate selection: `(lifecycle_phase = 'active', deadline_at_ms
10//! IS NOT NULL, deadline_at_ms < now)` on `ff_exec_core`. The
11//! `deadline_at_ms` column is overloaded (it also carries
12//! `delay_until` for rows parked by `attempt::delay()`); the
13//! `lifecycle_phase = 'active'` predicate discriminates.
14//!
15//! Action: terminal-fail with `last_failure_message =
16//! 'execution_deadline'`, clear the active attempt's lease, emit
17//! `ff_completion_event { outcome = 'expired' }`.
18
19use ff_core::engine_error::EngineError;
20use sqlx::{PgPool, Row};
21
22use crate::error::map_sqlx_error;
23use crate::lease_event;
24
25/// Per-execution expire action for the execution-deadline scanner.
26/// Exposed so the `EngineBackend::expire_execution` trait dispatch
27/// (`ExpirePhase::ExecutionDeadline`) can invoke the same per-row tx
28/// logic the batch scanner would. Silently no-ops when the exec is
29/// no longer `active` or its deadline has not yet elapsed (re-checked
30/// under `FOR UPDATE`).
31pub async fn expire_for_execution(
32    pool: &PgPool,
33    partition_key: i16,
34    exec_uuid: uuid::Uuid,
35    now_ms: i64,
36) -> Result<(), EngineError> {
37    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
38
39    let core_row = sqlx::query(
40        r#"
41        SELECT attempt_index, lifecycle_phase, deadline_at_ms
42          FROM ff_exec_core
43         WHERE partition_key = $1 AND execution_id = $2
44         FOR UPDATE
45        "#,
46    )
47    .bind(partition_key)
48    .bind(exec_uuid)
49    .fetch_optional(&mut *tx)
50    .await
51    .map_err(map_sqlx_error)?;
52
53    let Some(core) = core_row else {
54        tx.rollback().await.map_err(map_sqlx_error)?;
55        return Ok(());
56    };
57    let phase: String = core.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
58    let deadline_at: Option<i64> = core
59        .try_get::<Option<i64>, _>("deadline_at_ms")
60        .map_err(map_sqlx_error)?;
61    if phase != "active" || !matches!(deadline_at, Some(d) if d < now_ms) {
62        tx.rollback().await.map_err(map_sqlx_error)?;
63        return Ok(());
64    }
65    let attempt_index: i32 = core.try_get("attempt_index").map_err(map_sqlx_error)?;
66
67    // Release the active attempt's lease + mark it interrupted. The
68    // row may legitimately lack a lease (e.g. exec is active but its
69    // current attempt is mid-transition) — UPDATE affecting 0 rows is
70    // fine.
71    sqlx::query(
72        r#"
73        UPDATE ff_attempt
74           SET outcome = 'interrupted',
75               lease_expires_at_ms = NULL,
76               terminal_at_ms = $1
77         WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
78        "#,
79    )
80    .bind(now_ms)
81    .bind(partition_key)
82    .bind(exec_uuid)
83    .bind(attempt_index)
84    .execute(&mut *tx)
85    .await
86    .map_err(map_sqlx_error)?;
87
88    sqlx::query(
89        r#"
90        UPDATE ff_exec_core
91           SET lifecycle_phase = 'terminal',
92               ownership_state = 'unowned',
93               eligibility_state = 'not_applicable',
94               attempt_state = 'attempt_terminal',
95               terminal_at_ms = $1,
96               raw_fields = raw_fields || jsonb_build_object(
97                   'last_failure_message', 'execution_deadline'
98               )
99         WHERE partition_key = $2 AND execution_id = $3
100        "#,
101    )
102    .bind(now_ms)
103    .bind(partition_key)
104    .bind(exec_uuid)
105    .execute(&mut *tx)
106    .await
107    .map_err(map_sqlx_error)?;
108
109    sqlx::query(
110        r#"
111        INSERT INTO ff_completion_event (
112            partition_key, execution_id, flow_id, outcome,
113            namespace, instance_tag, occurred_at_ms
114        )
115        SELECT partition_key, execution_id, flow_id, 'expired',
116               NULL, NULL, $3
117          FROM ff_exec_core
118         WHERE partition_key = $1 AND execution_id = $2
119        "#,
120    )
121    .bind(partition_key)
122    .bind(exec_uuid)
123    .bind(now_ms)
124    .execute(&mut *tx)
125    .await
126    .map_err(map_sqlx_error)?;
127
128    // RFC-019 Stage B outbox: lease expired (execution_deadline
129    // reconciler). Matches the `attempt_timeout` terminal emit.
130    lease_event::emit(
131        &mut tx,
132        partition_key,
133        exec_uuid,
134        None,
135        lease_event::EVENT_EXPIRED,
136        now_ms,
137    )
138    .await?;
139
140    tx.commit().await.map_err(map_sqlx_error)?;
141    Ok(())
142}