Skip to main content

ff_backend_postgres/
attempt.rs

1//! Attempt trait-method family — Postgres implementation.
2//!
3//! **RFC-v0.7 Wave 4b.** Bodies for `claim`, `claim_from_reclaim`,
4//! `renew`, `progress`, `complete`, `fail`, `delay`, `wait_children`.
5//!
6//! # Fence-triple invariants (RFC-003)
7//!
8//! Every RMW against `ff_attempt` is wrapped in a BEGIN/COMMIT + a
9//! `SELECT ... FOR UPDATE` of the target attempt row. The claim cascade
10//! uses `FOR UPDATE SKIP LOCKED` on the exec_core + attempt join so two
11//! workers racing on the same lane cannot double-claim. Lease epoch is
12//! the authoritative fencing token: every terminal/progress/renew op
13//! first re-reads the attempt row under lock and compares
14//! `lease_epoch` against the handle's embedded epoch. Mismatch →
15//! `EngineError::Contention(LeaseConflict)`.
16//!
17//! # Isolation level (Q11)
18//!
19//! Default `READ COMMITTED`. The claim query uses `SKIP LOCKED` to
20//! keep the scanner contention-free; terminal ops use row-level
21//! `FOR UPDATE` on the already-identified attempt row. This is enough
22//! for the fence-triple invariant because the primary key on
23//! `ff_attempt` is `(partition_key, execution_id, attempt_index)` —
24//! the `FOR UPDATE` is partition-local and deterministic.
25//!
26//! # Isolation note on capability matching
27//!
28//! Capability subset-match runs in Rust *after* the FOR UPDATE SKIP
29//! LOCKED row is obtained. If the worker does not satisfy the row's
30//! required_capabilities, we `ROLLBACK` (releasing the lock) and
31//! return `Ok(None)` so another worker with the right caps can pick
32//! the row up. We do NOT try to re-scan within the same claim call
33//! — that would blow the blocking-wait budget in a pathological
34//! mismatch loop; the caller's retry cadence is the right scope.
35
36use ff_core::backend::{
37    BackendTag, CapabilitySet, ClaimPolicy, FailOutcome, FailureClass, FailureReason, Handle,
38    HandleKind, LeaseRenewal, ReclaimToken,
39};
40use ff_core::caps::{matches as caps_matches, CapabilityRequirement};
41use ff_core::engine_error::{ContentionKind, EngineError};
42use ff_core::handle_codec::{decode as decode_opaque, encode as encode_opaque, HandlePayload};
43use ff_core::types::{
44    AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, TimestampMs,
45};
46use sqlx::{PgPool, Row};
47use std::time::{SystemTime, UNIX_EPOCH};
48use uuid::Uuid;
49
50use crate::error::map_sqlx_error;
51
52// ── helpers ─────────────────────────────────────────────────────────────
53
54fn now_ms() -> i64 {
55    i64::try_from(
56        SystemTime::now()
57            .duration_since(UNIX_EPOCH)
58            .map(|d| d.as_millis())
59            .unwrap_or(0),
60    )
61    .unwrap_or(i64::MAX)
62}
63
64/// Extract `(partition_index, uuid)` from an `ExecutionId` formatted
65/// `{fp:N}:<uuid>`.
66fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
67    let s = eid.as_str();
68    let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
69        kind: ff_core::engine_error::ValidationKind::InvalidInput,
70        detail: format!("execution_id missing `{{fp:` prefix: {s}"),
71    })?;
72    let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
73        kind: ff_core::engine_error::ValidationKind::InvalidInput,
74        detail: format!("execution_id missing `}}:`: {s}"),
75    })?;
76    let part: i16 = rest[..close]
77        .parse()
78        .map_err(|_| EngineError::Validation {
79            kind: ff_core::engine_error::ValidationKind::InvalidInput,
80            detail: format!("execution_id partition index not u16: {s}"),
81        })?;
82    let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
83        kind: ff_core::engine_error::ValidationKind::InvalidInput,
84        detail: format!("execution_id UUID invalid: {s}"),
85    })?;
86    Ok((part, uuid))
87}
88
89fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
90    if handle.backend != BackendTag::Postgres {
91        return Err(EngineError::Validation {
92            kind: ff_core::engine_error::ValidationKind::Corruption,
93            detail: format!(
94                "handle minted by {:?} passed to Postgres backend",
95                handle.backend
96            ),
97        });
98    }
99    let decoded = decode_opaque(&handle.opaque)?;
100    if decoded.tag != BackendTag::Postgres {
101        return Err(EngineError::Validation {
102            kind: ff_core::engine_error::ValidationKind::Corruption,
103            detail: format!("inner handle tag mismatch: {:?}", decoded.tag),
104        });
105    }
106    Ok(decoded.payload)
107}
108
109fn mint_handle(payload: HandlePayload, kind: HandleKind) -> Handle {
110    let op = encode_opaque(BackendTag::Postgres, &payload);
111    Handle::new(BackendTag::Postgres, kind, op)
112}
113
114// ── claim ───────────────────────────────────────────────────────────────
115
116pub(crate) async fn claim(
117    pool: &PgPool,
118    lane: &LaneId,
119    capabilities: &CapabilitySet,
120    policy: &ClaimPolicy,
121) -> Result<Option<Handle>, EngineError> {
122    // We scan each partition in random order. For Wave 4b we iterate
123    // partitions 0..256; a production path would use a sampled order +
124    // per-lane cache. Keeping the happy-path simple: the test fixture
125    // inserts into a known partition, so we scan all 256.
126    let total_partitions: i16 = 256;
127    for part in 0..total_partitions {
128        match try_claim_in_partition(pool, part, lane, capabilities, policy).await? {
129            Some(h) => return Ok(Some(h)),
130            None => continue,
131        }
132    }
133    Ok(None)
134}
135
136async fn try_claim_in_partition(
137    pool: &PgPool,
138    part: i16,
139    lane: &LaneId,
140    capabilities: &CapabilitySet,
141    policy: &ClaimPolicy,
142) -> Result<Option<Handle>, EngineError> {
143    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
144    // SELECT one eligible exec row in this partition/lane — FOR UPDATE
145    // SKIP LOCKED keeps contending workers from pile-ups.
146    let row = sqlx::query(
147        r#"
148        SELECT execution_id, required_capabilities, attempt_index
149          FROM ff_exec_core
150         WHERE partition_key = $1
151           AND lane_id = $2
152           AND lifecycle_phase = 'runnable'
153           AND eligibility_state = 'eligible_now'
154         ORDER BY priority DESC, created_at_ms ASC
155         FOR UPDATE SKIP LOCKED
156         LIMIT 1
157        "#,
158    )
159    .bind(part)
160    .bind(lane.as_str())
161    .fetch_optional(&mut *tx)
162    .await
163    .map_err(map_sqlx_error)?;
164
165    let Some(row) = row else {
166        // No candidate in this partition — release the tx.
167        tx.rollback().await.map_err(map_sqlx_error)?;
168        return Ok(None);
169    };
170
171    let exec_uuid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
172    let required_caps: Vec<String> = row
173        .try_get::<Vec<String>, _>("required_capabilities")
174        .map_err(map_sqlx_error)?;
175    let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
176    let req = CapabilityRequirement::new(required_caps);
177    if !caps_matches(&req, capabilities) {
178        // Release the exec row lock; skip.
179        tx.rollback().await.map_err(map_sqlx_error)?;
180        return Ok(None);
181    }
182
183    let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
184    let now = now_ms();
185    let lease_ttl_ms = i64::from(policy.lease_ttl_ms);
186    let expires = now.saturating_add(lease_ttl_ms);
187
188    // UPSERT the attempt row: fresh lease epoch 1 on first claim; on
189    // a retry attempt the attempt_index is new so the PK doesn't
190    // collide. We always INSERT ON CONFLICT DO UPDATE to be safe.
191    sqlx::query(
192        r#"
193        INSERT INTO ff_attempt (
194            partition_key, execution_id, attempt_index,
195            worker_id, worker_instance_id,
196            lease_epoch, lease_expires_at_ms, started_at_ms
197        ) VALUES ($1, $2, $3, $4, $5, 1, $6, $7)
198        ON CONFLICT (partition_key, execution_id, attempt_index)
199        DO UPDATE SET
200            worker_id = EXCLUDED.worker_id,
201            worker_instance_id = EXCLUDED.worker_instance_id,
202            lease_epoch = ff_attempt.lease_epoch + 1,
203            lease_expires_at_ms = EXCLUDED.lease_expires_at_ms,
204            started_at_ms = EXCLUDED.started_at_ms,
205            outcome = NULL
206        "#,
207    )
208    .bind(part)
209    .bind(exec_uuid)
210    .bind(attempt_index_i)
211    .bind(policy.worker_id.as_str())
212    .bind(policy.worker_instance_id.as_str())
213    .bind(expires)
214    .bind(now)
215    .execute(&mut *tx)
216    .await
217    .map_err(map_sqlx_error)?;
218
219    // Re-read the epoch we just wrote.
220    let epoch_row = sqlx::query(
221        r#"
222        SELECT lease_epoch FROM ff_attempt
223         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
224        "#,
225    )
226    .bind(part)
227    .bind(exec_uuid)
228    .bind(attempt_index_i)
229    .fetch_one(&mut *tx)
230    .await
231    .map_err(map_sqlx_error)?;
232    let epoch_i: i64 = epoch_row.try_get("lease_epoch").map_err(map_sqlx_error)?;
233
234    // Flip exec_core to active.
235    sqlx::query(
236        r#"
237        UPDATE ff_exec_core
238           SET lifecycle_phase = 'active',
239               ownership_state = 'leased',
240               eligibility_state = 'not_applicable',
241               attempt_state = 'running_attempt'
242         WHERE partition_key = $1 AND execution_id = $2
243        "#,
244    )
245    .bind(part)
246    .bind(exec_uuid)
247    .execute(&mut *tx)
248    .await
249    .map_err(map_sqlx_error)?;
250
251    tx.commit().await.map_err(map_sqlx_error)?;
252
253    let exec_id = ExecutionId::parse(&format!("{{fp:{part}}}:{exec_uuid}")).map_err(|e| {
254        EngineError::Validation {
255            kind: ff_core::engine_error::ValidationKind::InvalidInput,
256            detail: format!("reassembling exec id: {e}"),
257        }
258    })?;
259    let payload = HandlePayload::new(
260        exec_id,
261        attempt_index,
262        AttemptId::new(),
263        LeaseId::new(),
264        LeaseEpoch(u64::try_from(epoch_i).unwrap_or(1)),
265        u64::from(policy.lease_ttl_ms),
266        lane.clone(),
267        policy.worker_instance_id.clone(),
268    );
269    Ok(Some(mint_handle(payload, HandleKind::Fresh)))
270}
271
272// ── claim_from_reclaim ──────────────────────────────────────────────────
273
274pub(crate) async fn claim_from_reclaim(
275    pool: &PgPool,
276    token: ReclaimToken,
277) -> Result<Option<Handle>, EngineError> {
278    let eid = &token.grant.execution_id;
279    let (part, exec_uuid) = split_exec_id(eid)?;
280    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
281    // Lock the attempt row.
282    let row = sqlx::query(
283        r#"
284        SELECT attempt_index, lease_epoch, lease_expires_at_ms
285          FROM ff_attempt
286         WHERE partition_key = $1 AND execution_id = $2
287         ORDER BY attempt_index DESC
288         FOR UPDATE
289         LIMIT 1
290        "#,
291    )
292    .bind(part)
293    .bind(exec_uuid)
294    .fetch_optional(&mut *tx)
295    .await
296    .map_err(map_sqlx_error)?;
297    let Some(row) = row else {
298        tx.rollback().await.map_err(map_sqlx_error)?;
299        return Err(EngineError::NotFound { entity: "attempt" });
300    };
301    let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
302    let current_epoch: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
303    let expires_at: Option<i64> = row
304        .try_get::<Option<i64>, _>("lease_expires_at_ms")
305        .map_err(map_sqlx_error)?;
306
307    // Live-lease check. A valid reclaim requires the prior lease
308    // to have expired (lease_expires_at_ms <= now) OR to be NULL
309    // (released).
310    let now = now_ms();
311    let live = matches!(expires_at, Some(exp) if exp > now);
312    if live {
313        tx.rollback().await.map_err(map_sqlx_error)?;
314        return Ok(None); // caller sees `None` — documented below is LeaseConflict; but per trait shape Ok(None) means "grant no longer available". Use Contention when semantics demand hard signal.
315    }
316
317    // Bump epoch + install new worker + fresh expiry.
318    let lease_ttl_ms = i64::from(token.lease_ttl_ms);
319    let new_expires = now.saturating_add(lease_ttl_ms);
320    sqlx::query(
321        r#"
322        UPDATE ff_attempt
323           SET worker_id = $1,
324               worker_instance_id = $2,
325               lease_epoch = lease_epoch + 1,
326               lease_expires_at_ms = $3,
327               started_at_ms = $4,
328               outcome = NULL
329         WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7
330        "#,
331    )
332    .bind(token.worker_id.as_str())
333    .bind(token.worker_instance_id.as_str())
334    .bind(new_expires)
335    .bind(now)
336    .bind(part)
337    .bind(exec_uuid)
338    .bind(attempt_index_i)
339    .execute(&mut *tx)
340    .await
341    .map_err(map_sqlx_error)?;
342
343    sqlx::query(
344        r#"
345        UPDATE ff_exec_core
346           SET lifecycle_phase = 'active',
347               ownership_state = 'leased',
348               eligibility_state = 'not_applicable',
349               attempt_state = 'running_attempt'
350         WHERE partition_key = $1 AND execution_id = $2
351        "#,
352    )
353    .bind(part)
354    .bind(exec_uuid)
355    .execute(&mut *tx)
356    .await
357    .map_err(map_sqlx_error)?;
358
359    tx.commit().await.map_err(map_sqlx_error)?;
360
361    let new_epoch = current_epoch.saturating_add(1);
362    let payload = HandlePayload::new(
363        eid.clone(),
364        AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0)),
365        AttemptId::new(),
366        LeaseId::new(),
367        LeaseEpoch(u64::try_from(new_epoch).unwrap_or(0)),
368        u64::from(token.lease_ttl_ms),
369        token.grant.lane_id.clone(),
370        token.worker_instance_id.clone(),
371    );
372    Ok(Some(mint_handle(payload, HandleKind::Resumed)))
373}
374
375// ── fence check ─────────────────────────────────────────────────────────
376
377/// Re-read the attempt row under FOR UPDATE + validate the handle's
378/// `lease_epoch` matches. Returns the locked row's
379/// `(attempt_index, lease_expires_at_ms)` for callers that need them
380/// for post-update logic. Fence mismatch → `Contention(LeaseConflict)`.
381async fn fence_check<'c>(
382    tx: &mut sqlx::Transaction<'c, sqlx::Postgres>,
383    part: i16,
384    exec_uuid: Uuid,
385    attempt_index: i32,
386    expected_epoch: u64,
387) -> Result<(), EngineError> {
388    let row = sqlx::query(
389        r#"
390        SELECT lease_epoch FROM ff_attempt
391         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
392         FOR UPDATE
393        "#,
394    )
395    .bind(part)
396    .bind(exec_uuid)
397    .bind(attempt_index)
398    .fetch_optional(&mut **tx)
399    .await
400    .map_err(map_sqlx_error)?;
401    let Some(row) = row else {
402        return Err(EngineError::NotFound { entity: "attempt" });
403    };
404    let epoch_i: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
405    let observed = u64::try_from(epoch_i).unwrap_or(0);
406    if observed != expected_epoch {
407        return Err(EngineError::Contention(ContentionKind::LeaseConflict));
408    }
409    Ok(())
410}
411
412// ── renew ───────────────────────────────────────────────────────────────
413
414pub(crate) async fn renew(
415    pool: &PgPool,
416    handle: &Handle,
417) -> Result<LeaseRenewal, EngineError> {
418    let payload = decode_handle(handle)?;
419    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
420    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
421    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
422    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
423    let now = now_ms();
424    let new_expires = now.saturating_add(i64::try_from(payload.lease_ttl_ms).unwrap_or(0));
425    sqlx::query(
426        r#"
427        UPDATE ff_attempt
428           SET lease_expires_at_ms = $1
429         WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
430        "#,
431    )
432    .bind(new_expires)
433    .bind(part)
434    .bind(exec_uuid)
435    .bind(attempt_index)
436    .execute(&mut *tx)
437    .await
438    .map_err(map_sqlx_error)?;
439    tx.commit().await.map_err(map_sqlx_error)?;
440    Ok(LeaseRenewal::new(
441        u64::try_from(new_expires).unwrap_or(0),
442        payload.lease_epoch.0,
443    ))
444}
445
446// ── progress ────────────────────────────────────────────────────────────
447
448pub(crate) async fn progress(
449    pool: &PgPool,
450    handle: &Handle,
451    percent: Option<u8>,
452    message: Option<String>,
453) -> Result<(), EngineError> {
454    let payload = decode_handle(handle)?;
455    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
456    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
457    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
458    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
459    // Stash progress on the exec_core raw_fields jsonb. raw_fields is
460    // the Wave-3 contract for "fields that don't yet have a typed
461    // column" — progress_pct + progress_message live here until a
462    // follow-up migration promotes them. This preserves the op's
463    // observable side effect (caller can read it back) without
464    // forking the schema.
465    let mut patch = serde_json::Map::new();
466    if let Some(pct) = percent {
467        patch.insert("progress_pct".into(), serde_json::Value::from(pct));
468    }
469    if let Some(msg) = message {
470        patch.insert("progress_message".into(), serde_json::Value::from(msg));
471    }
472    let patch_val = serde_json::Value::Object(patch);
473    sqlx::query(
474        r#"
475        UPDATE ff_exec_core
476           SET raw_fields = raw_fields || $1::jsonb
477         WHERE partition_key = $2 AND execution_id = $3
478        "#,
479    )
480    .bind(patch_val)
481    .bind(part)
482    .bind(exec_uuid)
483    .execute(&mut *tx)
484    .await
485    .map_err(map_sqlx_error)?;
486    tx.commit().await.map_err(map_sqlx_error)?;
487    Ok(())
488}
489
490// ── complete ────────────────────────────────────────────────────────────
491
492pub(crate) async fn complete(
493    pool: &PgPool,
494    handle: &Handle,
495    payload_bytes: Option<Vec<u8>>,
496) -> Result<(), EngineError> {
497    let payload = decode_handle(handle)?;
498    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
499    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
500    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
501    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
502    let now = now_ms();
503
504    sqlx::query(
505        r#"
506        UPDATE ff_attempt
507           SET terminal_at_ms = $1,
508               outcome = 'success',
509               lease_expires_at_ms = NULL
510         WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
511        "#,
512    )
513    .bind(now)
514    .bind(part)
515    .bind(exec_uuid)
516    .bind(attempt_index)
517    .execute(&mut *tx)
518    .await
519    .map_err(map_sqlx_error)?;
520
521    sqlx::query(
522        r#"
523        UPDATE ff_exec_core
524           SET lifecycle_phase = 'terminal',
525               ownership_state = 'unowned',
526               eligibility_state = 'not_applicable',
527               attempt_state = 'attempt_terminal',
528               terminal_at_ms = $1,
529               result = $2
530         WHERE partition_key = $3 AND execution_id = $4
531        "#,
532    )
533    .bind(now)
534    .bind(payload_bytes.as_deref())
535    .bind(part)
536    .bind(exec_uuid)
537    .execute(&mut *tx)
538    .await
539    .map_err(map_sqlx_error)?;
540
541    // Outbox: emit completion event → trigger fires pg_notify.
542    sqlx::query(
543        r#"
544        INSERT INTO ff_completion_event (
545            partition_key, execution_id, flow_id, outcome,
546            namespace, instance_tag, occurred_at_ms
547        )
548        SELECT partition_key, execution_id, flow_id, 'success',
549               NULL, NULL, $3
550          FROM ff_exec_core
551         WHERE partition_key = $1 AND execution_id = $2
552        "#,
553    )
554    .bind(part)
555    .bind(exec_uuid)
556    .bind(now)
557    .execute(&mut *tx)
558    .await
559    .map_err(map_sqlx_error)?;
560
561    tx.commit().await.map_err(map_sqlx_error)?;
562    Ok(())
563}
564
565// ── fail ────────────────────────────────────────────────────────────────
566
567pub(crate) async fn fail(
568    pool: &PgPool,
569    handle: &Handle,
570    reason: FailureReason,
571    classification: FailureClass,
572) -> Result<FailOutcome, EngineError> {
573    let payload = decode_handle(handle)?;
574    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
575    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
576    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
577    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
578    let now = now_ms();
579    let retryable = matches!(
580        classification,
581        FailureClass::Transient | FailureClass::InfraCrash
582    );
583
584    if retryable {
585        // Retry: re-enqueue the exec, release lease, bump attempt_index.
586        sqlx::query(
587            r#"
588            UPDATE ff_attempt
589               SET terminal_at_ms = $1,
590                   outcome = 'retry',
591                   lease_expires_at_ms = NULL
592             WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
593            "#,
594        )
595        .bind(now)
596        .bind(part)
597        .bind(exec_uuid)
598        .bind(attempt_index)
599        .execute(&mut *tx)
600        .await
601        .map_err(map_sqlx_error)?;
602
603        sqlx::query(
604            r#"
605            UPDATE ff_exec_core
606               SET lifecycle_phase = 'runnable',
607                   ownership_state = 'unowned',
608                   eligibility_state = 'eligible_now',
609                   attempt_state = 'pending_retry_attempt',
610                   attempt_index = attempt_index + 1,
611                   raw_fields = raw_fields || jsonb_build_object('last_failure_message', $1::text)
612             WHERE partition_key = $2 AND execution_id = $3
613            "#,
614        )
615        .bind(&reason.message)
616        .bind(part)
617        .bind(exec_uuid)
618        .execute(&mut *tx)
619        .await
620        .map_err(map_sqlx_error)?;
621
622        tx.commit().await.map_err(map_sqlx_error)?;
623        Ok(FailOutcome::RetryScheduled {
624            delay_until: TimestampMs::from_millis(now),
625        })
626    } else {
627        // Terminal-failed.
628        sqlx::query(
629            r#"
630            UPDATE ff_attempt
631               SET terminal_at_ms = $1,
632                   outcome = 'failed',
633                   lease_expires_at_ms = NULL
634             WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
635            "#,
636        )
637        .bind(now)
638        .bind(part)
639        .bind(exec_uuid)
640        .bind(attempt_index)
641        .execute(&mut *tx)
642        .await
643        .map_err(map_sqlx_error)?;
644
645        sqlx::query(
646            r#"
647            UPDATE ff_exec_core
648               SET lifecycle_phase = 'terminal',
649                   ownership_state = 'unowned',
650                   eligibility_state = 'not_applicable',
651                   attempt_state = 'attempt_terminal',
652                   terminal_at_ms = $1,
653                   raw_fields = raw_fields || jsonb_build_object('last_failure_message', $2::text)
654             WHERE partition_key = $3 AND execution_id = $4
655            "#,
656        )
657        .bind(now)
658        .bind(&reason.message)
659        .bind(part)
660        .bind(exec_uuid)
661        .execute(&mut *tx)
662        .await
663        .map_err(map_sqlx_error)?;
664
665        sqlx::query(
666            r#"
667            INSERT INTO ff_completion_event (
668                partition_key, execution_id, flow_id, outcome,
669                namespace, instance_tag, occurred_at_ms
670            )
671            SELECT partition_key, execution_id, flow_id, 'failed',
672                   NULL, NULL, $3
673              FROM ff_exec_core
674             WHERE partition_key = $1 AND execution_id = $2
675            "#,
676        )
677        .bind(part)
678        .bind(exec_uuid)
679        .bind(now)
680        .execute(&mut *tx)
681        .await
682        .map_err(map_sqlx_error)?;
683
684        tx.commit().await.map_err(map_sqlx_error)?;
685        Ok(FailOutcome::TerminalFailed)
686    }
687}
688
689// ── delay ───────────────────────────────────────────────────────────────
690
691pub(crate) async fn delay(
692    pool: &PgPool,
693    handle: &Handle,
694    delay_until: TimestampMs,
695) -> Result<(), EngineError> {
696    let payload = decode_handle(handle)?;
697    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
698    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
699    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
700    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
701
702    sqlx::query(
703        r#"
704        UPDATE ff_attempt
705           SET outcome = 'interrupted',
706               lease_expires_at_ms = NULL
707         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
708        "#,
709    )
710    .bind(part)
711    .bind(exec_uuid)
712    .bind(attempt_index)
713    .execute(&mut *tx)
714    .await
715    .map_err(map_sqlx_error)?;
716
717    sqlx::query(
718        r#"
719        UPDATE ff_exec_core
720           SET lifecycle_phase = 'runnable',
721               ownership_state = 'unowned',
722               eligibility_state = 'not_eligible_until_time',
723               attempt_state = 'attempt_interrupted',
724               deadline_at_ms = $1
725         WHERE partition_key = $2 AND execution_id = $3
726        "#,
727    )
728    .bind(delay_until.0)
729    .bind(part)
730    .bind(exec_uuid)
731    .execute(&mut *tx)
732    .await
733    .map_err(map_sqlx_error)?;
734
735    tx.commit().await.map_err(map_sqlx_error)?;
736    Ok(())
737}
738
739// ── wait_children ───────────────────────────────────────────────────────
740
741pub(crate) async fn wait_children(
742    pool: &PgPool,
743    handle: &Handle,
744) -> Result<(), EngineError> {
745    let payload = decode_handle(handle)?;
746    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
747    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
748    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
749    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
750
751    sqlx::query(
752        r#"
753        UPDATE ff_attempt
754           SET outcome = 'waiting_children',
755               lease_expires_at_ms = NULL
756         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
757        "#,
758    )
759    .bind(part)
760    .bind(exec_uuid)
761    .bind(attempt_index)
762    .execute(&mut *tx)
763    .await
764    .map_err(map_sqlx_error)?;
765
766    sqlx::query(
767        r#"
768        UPDATE ff_exec_core
769           SET lifecycle_phase = 'runnable',
770               ownership_state = 'unowned',
771               eligibility_state = 'blocked_by_dependencies',
772               attempt_state = 'attempt_interrupted',
773               blocking_reason = 'waiting_for_children'
774         WHERE partition_key = $1 AND execution_id = $2
775        "#,
776    )
777    .bind(part)
778    .bind(exec_uuid)
779    .execute(&mut *tx)
780    .await
781    .map_err(map_sqlx_error)?;
782
783    tx.commit().await.map_err(map_sqlx_error)?;
784    Ok(())
785}
786