Skip to main content

ff_backend_postgres/
attempt.rs

1//! Attempt trait-method family — Postgres implementation.
2//!
3//! **RFC-v0.7 Wave 4b.** Bodies for `claim`, `claim_from_resume_grant`,
4//! `renew`, `progress`, `complete`, `fail`, `delay`, `wait_children`.
5//!
6//! # Fence-triple invariants (RFC-003)
7//!
8//! Every RMW against `ff_attempt` is wrapped in a BEGIN/COMMIT + a
9//! `SELECT ... FOR UPDATE` of the target attempt row. The claim cascade
10//! uses `FOR UPDATE SKIP LOCKED` on the exec_core + attempt join so two
11//! workers racing on the same lane cannot double-claim. Lease epoch is
12//! the authoritative fencing token: every terminal/progress/renew op
13//! first re-reads the attempt row under lock and compares
14//! `lease_epoch` against the handle's embedded epoch. Mismatch →
15//! `EngineError::Contention(LeaseConflict)`.
16//!
17//! # Isolation level (Q11)
18//!
19//! Default `READ COMMITTED`. The claim query uses `SKIP LOCKED` to
20//! keep the scanner contention-free; terminal ops use row-level
21//! `FOR UPDATE` on the already-identified attempt row. This is enough
22//! for the fence-triple invariant because the primary key on
23//! `ff_attempt` is `(partition_key, execution_id, attempt_index)` —
24//! the `FOR UPDATE` is partition-local and deterministic.
25//!
26//! # Isolation note on capability matching
27//!
28//! Capability subset-match runs in Rust *after* the FOR UPDATE SKIP
29//! LOCKED row is obtained. If the worker does not satisfy the row's
30//! required_capabilities, we `ROLLBACK` (releasing the lock) and
31//! return `Ok(None)` so another worker with the right caps can pick
32//! the row up. We do NOT try to re-scan within the same claim call
33//! — that would blow the blocking-wait budget in a pathological
34//! mismatch loop; the caller's retry cadence is the right scope.
35
36use ff_core::backend::{
37    BackendTag, CapabilitySet, ClaimPolicy, FailOutcome, FailureClass, FailureReason, Handle,
38    HandleKind, LeaseRenewal, ResumeToken,
39};
40use ff_core::caps::{matches as caps_matches, CapabilityRequirement};
41use ff_core::engine_error::{ContentionKind, EngineError};
42use ff_core::handle_codec::{decode as decode_opaque, encode as encode_opaque, HandlePayload};
43use ff_core::types::{
44    AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, TimestampMs,
45};
46use sqlx::{PgPool, Row};
47use std::time::{SystemTime, UNIX_EPOCH};
48use uuid::Uuid;
49
50use crate::error::map_sqlx_error;
51use crate::lease_event;
52
53// ── helpers ─────────────────────────────────────────────────────────────
54
55fn now_ms() -> i64 {
56    i64::try_from(
57        SystemTime::now()
58            .duration_since(UNIX_EPOCH)
59            .map(|d| d.as_millis())
60            .unwrap_or(0),
61    )
62    .unwrap_or(i64::MAX)
63}
64
65/// Extract `(partition_index, uuid)` from an `ExecutionId` formatted
66/// `{fp:N}:<uuid>`.
67pub(crate) fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
68    let s = eid.as_str();
69    let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
70        kind: ff_core::engine_error::ValidationKind::InvalidInput,
71        detail: format!("execution_id missing `{{fp:` prefix: {s}"),
72    })?;
73    let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
74        kind: ff_core::engine_error::ValidationKind::InvalidInput,
75        detail: format!("execution_id missing `}}:`: {s}"),
76    })?;
77    let part: i16 = rest[..close]
78        .parse()
79        .map_err(|_| EngineError::Validation {
80            kind: ff_core::engine_error::ValidationKind::InvalidInput,
81            detail: format!("execution_id partition index not u16: {s}"),
82        })?;
83    let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
84        kind: ff_core::engine_error::ValidationKind::InvalidInput,
85        detail: format!("execution_id UUID invalid: {s}"),
86    })?;
87    Ok((part, uuid))
88}
89
90fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
91    if handle.backend != BackendTag::Postgres {
92        return Err(EngineError::Validation {
93            kind: ff_core::engine_error::ValidationKind::Corruption,
94            detail: format!(
95                "handle minted by {:?} passed to Postgres backend",
96                handle.backend
97            ),
98        });
99    }
100    let decoded = decode_opaque(&handle.opaque)?;
101    if decoded.tag != BackendTag::Postgres {
102        return Err(EngineError::Validation {
103            kind: ff_core::engine_error::ValidationKind::Corruption,
104            detail: format!("inner handle tag mismatch: {:?}", decoded.tag),
105        });
106    }
107    Ok(decoded.payload)
108}
109
110fn mint_handle(payload: HandlePayload, kind: HandleKind) -> Handle {
111    let op = encode_opaque(BackendTag::Postgres, &payload);
112    Handle::new(BackendTag::Postgres, kind, op)
113}
114
115// ── claim ───────────────────────────────────────────────────────────────
116
117pub(crate) async fn claim(
118    pool: &PgPool,
119    lane: &LaneId,
120    capabilities: &CapabilitySet,
121    policy: &ClaimPolicy,
122) -> Result<Option<Handle>, EngineError> {
123    // We scan each partition in random order. For Wave 4b we iterate
124    // partitions 0..256; a production path would use a sampled order +
125    // per-lane cache. Keeping the happy-path simple: the test fixture
126    // inserts into a known partition, so we scan all 256.
127    let total_partitions: i16 = 256;
128    for part in 0..total_partitions {
129        match try_claim_in_partition(pool, part, lane, capabilities, policy).await? {
130            Some(h) => return Ok(Some(h)),
131            None => continue,
132        }
133    }
134    Ok(None)
135}
136
137async fn try_claim_in_partition(
138    pool: &PgPool,
139    part: i16,
140    lane: &LaneId,
141    capabilities: &CapabilitySet,
142    policy: &ClaimPolicy,
143) -> Result<Option<Handle>, EngineError> {
144    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
145    // SELECT one eligible exec row in this partition/lane — FOR UPDATE
146    // SKIP LOCKED keeps contending workers from pile-ups.
147    let row = sqlx::query(
148        r#"
149        SELECT execution_id, required_capabilities, attempt_index
150          FROM ff_exec_core
151         WHERE partition_key = $1
152           AND lane_id = $2
153           AND lifecycle_phase = 'runnable'
154           AND eligibility_state = 'eligible_now'
155         ORDER BY priority DESC, created_at_ms ASC
156         FOR UPDATE SKIP LOCKED
157         LIMIT 1
158        "#,
159    )
160    .bind(part)
161    .bind(lane.as_str())
162    .fetch_optional(&mut *tx)
163    .await
164    .map_err(map_sqlx_error)?;
165
166    let Some(row) = row else {
167        // No candidate in this partition — release the tx.
168        tx.rollback().await.map_err(map_sqlx_error)?;
169        return Ok(None);
170    };
171
172    let exec_uuid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
173    let required_caps: Vec<String> = row
174        .try_get::<Vec<String>, _>("required_capabilities")
175        .map_err(map_sqlx_error)?;
176    let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
177    let req = CapabilityRequirement::new(required_caps);
178    if !caps_matches(&req, capabilities) {
179        // Release the exec row lock; skip.
180        tx.rollback().await.map_err(map_sqlx_error)?;
181        return Ok(None);
182    }
183
184    let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
185    let now = now_ms();
186    let lease_ttl_ms = i64::from(policy.lease_ttl_ms);
187    let expires = now.saturating_add(lease_ttl_ms);
188
189    // UPSERT the attempt row: fresh lease epoch 1 on first claim; on
190    // a retry attempt the attempt_index is new so the PK doesn't
191    // collide. We always INSERT ON CONFLICT DO UPDATE to be safe.
192    sqlx::query(
193        r#"
194        INSERT INTO ff_attempt (
195            partition_key, execution_id, attempt_index,
196            worker_id, worker_instance_id,
197            lease_epoch, lease_expires_at_ms, started_at_ms
198        ) VALUES ($1, $2, $3, $4, $5, 1, $6, $7)
199        ON CONFLICT (partition_key, execution_id, attempt_index)
200        DO UPDATE SET
201            worker_id = EXCLUDED.worker_id,
202            worker_instance_id = EXCLUDED.worker_instance_id,
203            lease_epoch = ff_attempt.lease_epoch + 1,
204            lease_expires_at_ms = EXCLUDED.lease_expires_at_ms,
205            started_at_ms = EXCLUDED.started_at_ms,
206            outcome = NULL
207        "#,
208    )
209    .bind(part)
210    .bind(exec_uuid)
211    .bind(attempt_index_i)
212    .bind(policy.worker_id.as_str())
213    .bind(policy.worker_instance_id.as_str())
214    .bind(expires)
215    .bind(now)
216    .execute(&mut *tx)
217    .await
218    .map_err(map_sqlx_error)?;
219
220    // Re-read the epoch we just wrote.
221    let epoch_row = sqlx::query(
222        r#"
223        SELECT lease_epoch FROM ff_attempt
224         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
225        "#,
226    )
227    .bind(part)
228    .bind(exec_uuid)
229    .bind(attempt_index_i)
230    .fetch_one(&mut *tx)
231    .await
232    .map_err(map_sqlx_error)?;
233    let epoch_i: i64 = epoch_row.try_get("lease_epoch").map_err(map_sqlx_error)?;
234
235    // Flip exec_core to active. #356: `started_at_ms` is set-once —
236    // `COALESCE(started_at_ms, $3)` preserves the original first-claim
237    // timestamp across reclaim + retry attempts, matching Valkey's
238    // dedicated set-once `exec_core["started_at"]` field.
239    //
240    // `public_state = 'running'` mirrors the resume-claim write in
241    // `suspend_ops.rs` and the SQLite first-claim write in
242    // `ff-backend-sqlite/src/queries/exec_core.rs`. Without this field
243    // the row stayed at its create-time `'waiting'` literal on PG
244    // while Spine-B readers expected the claimed-execution literal.
245    sqlx::query(
246        r#"
247        UPDATE ff_exec_core
248           SET lifecycle_phase = 'active',
249               ownership_state = 'leased',
250               eligibility_state = 'not_applicable',
251               public_state = 'running',
252               attempt_state = 'running_attempt',
253               started_at_ms = COALESCE(started_at_ms, $3)
254         WHERE partition_key = $1 AND execution_id = $2
255        "#,
256    )
257    .bind(part)
258    .bind(exec_uuid)
259    .bind(now)
260    .execute(&mut *tx)
261    .await
262    .map_err(map_sqlx_error)?;
263
264    // RFC-019 Stage B outbox: lease acquired.
265    lease_event::emit(
266        &mut tx,
267        part,
268        exec_uuid,
269        None,
270        lease_event::EVENT_ACQUIRED,
271        now,
272    )
273    .await?;
274
275    tx.commit().await.map_err(map_sqlx_error)?;
276
277    let exec_id = ExecutionId::parse(&format!("{{fp:{part}}}:{exec_uuid}")).map_err(|e| {
278        EngineError::Validation {
279            kind: ff_core::engine_error::ValidationKind::InvalidInput,
280            detail: format!("reassembling exec id: {e}"),
281        }
282    })?;
283    let payload = HandlePayload::new(
284        exec_id,
285        attempt_index,
286        AttemptId::new(),
287        LeaseId::new(),
288        LeaseEpoch(u64::try_from(epoch_i).unwrap_or(1)),
289        u64::from(policy.lease_ttl_ms),
290        lane.clone(),
291        policy.worker_instance_id.clone(),
292    );
293    Ok(Some(mint_handle(payload, HandleKind::Fresh)))
294}
295
296// ── claim_from_resume_grant ──────────────────────────────────────────────────
297
298pub(crate) async fn claim_from_resume_grant(
299    pool: &PgPool,
300    token: ResumeToken,
301) -> Result<Option<Handle>, EngineError> {
302    let eid = &token.grant.execution_id;
303    let (part, exec_uuid) = split_exec_id(eid)?;
304    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
305    // Lock the attempt row.
306    let row = sqlx::query(
307        r#"
308        SELECT attempt_index, lease_epoch, lease_expires_at_ms
309          FROM ff_attempt
310         WHERE partition_key = $1 AND execution_id = $2
311         ORDER BY attempt_index DESC
312         FOR UPDATE
313         LIMIT 1
314        "#,
315    )
316    .bind(part)
317    .bind(exec_uuid)
318    .fetch_optional(&mut *tx)
319    .await
320    .map_err(map_sqlx_error)?;
321    let Some(row) = row else {
322        tx.rollback().await.map_err(map_sqlx_error)?;
323        return Err(EngineError::NotFound { entity: "attempt" });
324    };
325    let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
326    let current_epoch: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
327    let expires_at: Option<i64> = row
328        .try_get::<Option<i64>, _>("lease_expires_at_ms")
329        .map_err(map_sqlx_error)?;
330
331    // Live-lease check. A valid reclaim requires the prior lease
332    // to have expired (lease_expires_at_ms <= now) OR to be NULL
333    // (released).
334    let now = now_ms();
335    let live = matches!(expires_at, Some(exp) if exp > now);
336    if live {
337        tx.rollback().await.map_err(map_sqlx_error)?;
338        return Ok(None); // caller sees `None` — documented below is LeaseConflict; but per trait shape Ok(None) means "grant no longer available". Use Contention when semantics demand hard signal.
339    }
340
341    // Bump epoch + install new worker + fresh expiry.
342    let lease_ttl_ms = i64::from(token.lease_ttl_ms);
343    let new_expires = now.saturating_add(lease_ttl_ms);
344    sqlx::query(
345        r#"
346        UPDATE ff_attempt
347           SET worker_id = $1,
348               worker_instance_id = $2,
349               lease_epoch = lease_epoch + 1,
350               lease_expires_at_ms = $3,
351               started_at_ms = $4,
352               outcome = NULL
353         WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7
354        "#,
355    )
356    .bind(token.worker_id.as_str())
357    .bind(token.worker_instance_id.as_str())
358    .bind(new_expires)
359    .bind(now)
360    .bind(part)
361    .bind(exec_uuid)
362    .bind(attempt_index_i)
363    .execute(&mut *tx)
364    .await
365    .map_err(map_sqlx_error)?;
366
367    sqlx::query(
368        r#"
369        UPDATE ff_exec_core
370           SET lifecycle_phase = 'active',
371               ownership_state = 'leased',
372               eligibility_state = 'not_applicable',
373               attempt_state = 'running_attempt'
374         WHERE partition_key = $1 AND execution_id = $2
375        "#,
376    )
377    .bind(part)
378    .bind(exec_uuid)
379    .execute(&mut *tx)
380    .await
381    .map_err(map_sqlx_error)?;
382
383    // RFC-019 Stage B outbox: lease reclaimed.
384    lease_event::emit(
385        &mut tx,
386        part,
387        exec_uuid,
388        None,
389        lease_event::EVENT_RECLAIMED,
390        now,
391    )
392    .await?;
393
394    tx.commit().await.map_err(map_sqlx_error)?;
395
396    let new_epoch = current_epoch.saturating_add(1);
397    let payload = HandlePayload::new(
398        eid.clone(),
399        AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0)),
400        AttemptId::new(),
401        LeaseId::new(),
402        LeaseEpoch(u64::try_from(new_epoch).unwrap_or(0)),
403        u64::from(token.lease_ttl_ms),
404        token.grant.lane_id.clone(),
405        token.worker_instance_id.clone(),
406    );
407    Ok(Some(mint_handle(payload, HandleKind::Resumed)))
408}
409
410// ── fence check ─────────────────────────────────────────────────────────
411
412/// Re-read the attempt row under FOR UPDATE + validate the handle's
413/// `lease_epoch` matches. Returns the locked row's
414/// `(attempt_index, lease_expires_at_ms)` for callers that need them
415/// for post-update logic. Fence mismatch → `Contention(LeaseConflict)`.
416async fn fence_check<'c>(
417    tx: &mut sqlx::Transaction<'c, sqlx::Postgres>,
418    part: i16,
419    exec_uuid: Uuid,
420    attempt_index: i32,
421    expected_epoch: u64,
422) -> Result<(), EngineError> {
423    let row = sqlx::query(
424        r#"
425        SELECT lease_epoch FROM ff_attempt
426         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
427         FOR UPDATE
428        "#,
429    )
430    .bind(part)
431    .bind(exec_uuid)
432    .bind(attempt_index)
433    .fetch_optional(&mut **tx)
434    .await
435    .map_err(map_sqlx_error)?;
436    let Some(row) = row else {
437        return Err(EngineError::NotFound { entity: "attempt" });
438    };
439    let epoch_i: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
440    let observed = u64::try_from(epoch_i).unwrap_or(0);
441    if observed != expected_epoch {
442        return Err(EngineError::Contention(ContentionKind::LeaseConflict));
443    }
444    Ok(())
445}
446
447// ── renew ───────────────────────────────────────────────────────────────
448
449pub(crate) async fn renew(
450    pool: &PgPool,
451    handle: &Handle,
452) -> Result<LeaseRenewal, EngineError> {
453    let payload = decode_handle(handle)?;
454    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
455    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
456    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
457    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
458    let now = now_ms();
459    let new_expires = now.saturating_add(i64::try_from(payload.lease_ttl_ms).unwrap_or(0));
460    sqlx::query(
461        r#"
462        UPDATE ff_attempt
463           SET lease_expires_at_ms = $1
464         WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
465        "#,
466    )
467    .bind(new_expires)
468    .bind(part)
469    .bind(exec_uuid)
470    .bind(attempt_index)
471    .execute(&mut *tx)
472    .await
473    .map_err(map_sqlx_error)?;
474    // RFC-019 Stage B outbox: lease renewed.
475    lease_event::emit(
476        &mut tx,
477        part,
478        exec_uuid,
479        None,
480        lease_event::EVENT_RENEWED,
481        now,
482    )
483    .await?;
484    tx.commit().await.map_err(map_sqlx_error)?;
485    Ok(LeaseRenewal::new(
486        u64::try_from(new_expires).unwrap_or(0),
487        payload.lease_epoch.0,
488    ))
489}
490
491// ── progress ────────────────────────────────────────────────────────────
492
493pub(crate) async fn progress(
494    pool: &PgPool,
495    handle: &Handle,
496    percent: Option<u8>,
497    message: Option<String>,
498) -> Result<(), EngineError> {
499    let payload = decode_handle(handle)?;
500    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
501    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
502    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
503    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
504    // Stash progress on the exec_core raw_fields jsonb. raw_fields is
505    // the Wave-3 contract for "fields that don't yet have a typed
506    // column" — progress_pct + progress_message live here until a
507    // follow-up migration promotes them. This preserves the op's
508    // observable side effect (caller can read it back) without
509    // forking the schema.
510    let mut patch = serde_json::Map::new();
511    if let Some(pct) = percent {
512        patch.insert("progress_pct".into(), serde_json::Value::from(pct));
513    }
514    if let Some(msg) = message {
515        patch.insert("progress_message".into(), serde_json::Value::from(msg));
516    }
517    let patch_val = serde_json::Value::Object(patch);
518    sqlx::query(
519        r#"
520        UPDATE ff_exec_core
521           SET raw_fields = raw_fields || $1::jsonb
522         WHERE partition_key = $2 AND execution_id = $3
523        "#,
524    )
525    .bind(patch_val)
526    .bind(part)
527    .bind(exec_uuid)
528    .execute(&mut *tx)
529    .await
530    .map_err(map_sqlx_error)?;
531    tx.commit().await.map_err(map_sqlx_error)?;
532    Ok(())
533}
534
535// ── complete ────────────────────────────────────────────────────────────
536
537pub(crate) async fn complete(
538    pool: &PgPool,
539    handle: &Handle,
540    payload_bytes: Option<Vec<u8>>,
541) -> Result<(), EngineError> {
542    let payload = decode_handle(handle)?;
543    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
544    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
545    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
546    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
547    let now = now_ms();
548
549    sqlx::query(
550        r#"
551        UPDATE ff_attempt
552           SET terminal_at_ms = $1,
553               outcome = 'success',
554               lease_expires_at_ms = NULL
555         WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
556        "#,
557    )
558    .bind(now)
559    .bind(part)
560    .bind(exec_uuid)
561    .bind(attempt_index)
562    .execute(&mut *tx)
563    .await
564    .map_err(map_sqlx_error)?;
565
566    sqlx::query(
567        r#"
568        UPDATE ff_exec_core
569           SET lifecycle_phase = 'terminal',
570               ownership_state = 'unowned',
571               eligibility_state = 'not_applicable',
572               attempt_state = 'attempt_terminal',
573               terminal_at_ms = $1,
574               result = $2
575         WHERE partition_key = $3 AND execution_id = $4
576        "#,
577    )
578    .bind(now)
579    .bind(payload_bytes.as_deref())
580    .bind(part)
581    .bind(exec_uuid)
582    .execute(&mut *tx)
583    .await
584    .map_err(map_sqlx_error)?;
585
586    // Outbox: emit completion event → trigger fires pg_notify.
587    sqlx::query(
588        r#"
589        INSERT INTO ff_completion_event (
590            partition_key, execution_id, flow_id, outcome,
591            namespace, instance_tag, occurred_at_ms
592        )
593        SELECT partition_key, execution_id, flow_id, 'success',
594               NULL, NULL, $3
595          FROM ff_exec_core
596         WHERE partition_key = $1 AND execution_id = $2
597        "#,
598    )
599    .bind(part)
600    .bind(exec_uuid)
601    .bind(now)
602    .execute(&mut *tx)
603    .await
604    .map_err(map_sqlx_error)?;
605
606    // RFC-019 Stage B outbox: lease revoked (terminal success).
607    lease_event::emit(
608        &mut tx,
609        part,
610        exec_uuid,
611        None,
612        lease_event::EVENT_REVOKED,
613        now,
614    )
615    .await?;
616
617    tx.commit().await.map_err(map_sqlx_error)?;
618    Ok(())
619}
620
621// ── fail ────────────────────────────────────────────────────────────────
622
623pub(crate) async fn fail(
624    pool: &PgPool,
625    handle: &Handle,
626    reason: FailureReason,
627    classification: FailureClass,
628) -> Result<FailOutcome, EngineError> {
629    let payload = decode_handle(handle)?;
630    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
631    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
632    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
633    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
634    let now = now_ms();
635    let retryable = matches!(
636        classification,
637        FailureClass::Transient | FailureClass::InfraCrash
638    );
639
640    if retryable {
641        // Retry: re-enqueue the exec, release lease, bump attempt_index.
642        sqlx::query(
643            r#"
644            UPDATE ff_attempt
645               SET terminal_at_ms = $1,
646                   outcome = 'retry',
647                   lease_expires_at_ms = NULL
648             WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
649            "#,
650        )
651        .bind(now)
652        .bind(part)
653        .bind(exec_uuid)
654        .bind(attempt_index)
655        .execute(&mut *tx)
656        .await
657        .map_err(map_sqlx_error)?;
658
659        sqlx::query(
660            r#"
661            UPDATE ff_exec_core
662               SET lifecycle_phase = 'runnable',
663                   ownership_state = 'unowned',
664                   eligibility_state = 'eligible_now',
665                   attempt_state = 'pending_retry_attempt',
666                   attempt_index = attempt_index + 1,
667                   raw_fields = raw_fields || jsonb_build_object('last_failure_message', $1::text)
668             WHERE partition_key = $2 AND execution_id = $3
669            "#,
670        )
671        .bind(&reason.message)
672        .bind(part)
673        .bind(exec_uuid)
674        .execute(&mut *tx)
675        .await
676        .map_err(map_sqlx_error)?;
677
678        // RFC-019 Stage B outbox: lease revoked (retry scheduled).
679        lease_event::emit(
680            &mut tx,
681            part,
682            exec_uuid,
683            None,
684            lease_event::EVENT_REVOKED,
685            now,
686        )
687        .await?;
688
689        tx.commit().await.map_err(map_sqlx_error)?;
690        Ok(FailOutcome::RetryScheduled {
691            delay_until: TimestampMs::from_millis(now),
692        })
693    } else {
694        // Terminal-failed.
695        sqlx::query(
696            r#"
697            UPDATE ff_attempt
698               SET terminal_at_ms = $1,
699                   outcome = 'failed',
700                   lease_expires_at_ms = NULL
701             WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
702            "#,
703        )
704        .bind(now)
705        .bind(part)
706        .bind(exec_uuid)
707        .bind(attempt_index)
708        .execute(&mut *tx)
709        .await
710        .map_err(map_sqlx_error)?;
711
712        sqlx::query(
713            r#"
714            UPDATE ff_exec_core
715               SET lifecycle_phase = 'terminal',
716                   ownership_state = 'unowned',
717                   eligibility_state = 'not_applicable',
718                   attempt_state = 'attempt_terminal',
719                   terminal_at_ms = $1,
720                   raw_fields = raw_fields || jsonb_build_object('last_failure_message', $2::text)
721             WHERE partition_key = $3 AND execution_id = $4
722            "#,
723        )
724        .bind(now)
725        .bind(&reason.message)
726        .bind(part)
727        .bind(exec_uuid)
728        .execute(&mut *tx)
729        .await
730        .map_err(map_sqlx_error)?;
731
732        sqlx::query(
733            r#"
734            INSERT INTO ff_completion_event (
735                partition_key, execution_id, flow_id, outcome,
736                namespace, instance_tag, occurred_at_ms
737            )
738            SELECT partition_key, execution_id, flow_id, 'failed',
739                   NULL, NULL, $3
740              FROM ff_exec_core
741             WHERE partition_key = $1 AND execution_id = $2
742            "#,
743        )
744        .bind(part)
745        .bind(exec_uuid)
746        .bind(now)
747        .execute(&mut *tx)
748        .await
749        .map_err(map_sqlx_error)?;
750
751        // RFC-019 Stage B outbox: lease revoked (terminal fail).
752        lease_event::emit(
753            &mut tx,
754            part,
755            exec_uuid,
756            None,
757            lease_event::EVENT_REVOKED,
758            now,
759        )
760        .await?;
761
762        tx.commit().await.map_err(map_sqlx_error)?;
763        Ok(FailOutcome::TerminalFailed)
764    }
765}
766
767// ── delay ───────────────────────────────────────────────────────────────
768
769pub(crate) async fn delay(
770    pool: &PgPool,
771    handle: &Handle,
772    delay_until: TimestampMs,
773) -> Result<(), EngineError> {
774    let payload = decode_handle(handle)?;
775    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
776    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
777    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
778    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
779
780    sqlx::query(
781        r#"
782        UPDATE ff_attempt
783           SET outcome = 'interrupted',
784               lease_expires_at_ms = NULL
785         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
786        "#,
787    )
788    .bind(part)
789    .bind(exec_uuid)
790    .bind(attempt_index)
791    .execute(&mut *tx)
792    .await
793    .map_err(map_sqlx_error)?;
794
795    sqlx::query(
796        r#"
797        UPDATE ff_exec_core
798           SET lifecycle_phase = 'runnable',
799               ownership_state = 'unowned',
800               eligibility_state = 'not_eligible_until_time',
801               attempt_state = 'attempt_interrupted',
802               deadline_at_ms = $1
803         WHERE partition_key = $2 AND execution_id = $3
804        "#,
805    )
806    .bind(delay_until.0)
807    .bind(part)
808    .bind(exec_uuid)
809    .execute(&mut *tx)
810    .await
811    .map_err(map_sqlx_error)?;
812
813    // RFC-019 Stage B outbox: lease revoked (delay).
814    lease_event::emit(
815        &mut tx,
816        part,
817        exec_uuid,
818        None,
819        lease_event::EVENT_REVOKED,
820        now_ms(),
821    )
822    .await?;
823
824    tx.commit().await.map_err(map_sqlx_error)?;
825    Ok(())
826}
827
828// ── wait_children ───────────────────────────────────────────────────────
829
830pub(crate) async fn wait_children(
831    pool: &PgPool,
832    handle: &Handle,
833) -> Result<(), EngineError> {
834    let payload = decode_handle(handle)?;
835    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
836    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
837    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
838    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
839
840    sqlx::query(
841        r#"
842        UPDATE ff_attempt
843           SET outcome = 'waiting_children',
844               lease_expires_at_ms = NULL
845         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
846        "#,
847    )
848    .bind(part)
849    .bind(exec_uuid)
850    .bind(attempt_index)
851    .execute(&mut *tx)
852    .await
853    .map_err(map_sqlx_error)?;
854
855    sqlx::query(
856        r#"
857        UPDATE ff_exec_core
858           SET lifecycle_phase = 'runnable',
859               ownership_state = 'unowned',
860               eligibility_state = 'blocked_by_dependencies',
861               attempt_state = 'attempt_interrupted',
862               blocking_reason = 'waiting_for_children'
863         WHERE partition_key = $1 AND execution_id = $2
864        "#,
865    )
866    .bind(part)
867    .bind(exec_uuid)
868    .execute(&mut *tx)
869    .await
870    .map_err(map_sqlx_error)?;
871
872    // RFC-019 Stage B outbox: lease revoked (wait_children).
873    lease_event::emit(
874        &mut tx,
875        part,
876        exec_uuid,
877        None,
878        lease_event::EVENT_REVOKED,
879        now_ms(),
880    )
881    .await?;
882
883    tx.commit().await.map_err(map_sqlx_error)?;
884    Ok(())
885}
886