Skip to main content

ff_backend_postgres/
attempt.rs

1//! Attempt trait-method family — Postgres implementation.
2//!
3//! **RFC-v0.7 Wave 4b.** Bodies for `claim`, `claim_from_reclaim`,
4//! `renew`, `progress`, `complete`, `fail`, `delay`, `wait_children`.
5//!
6//! # Fence-triple invariants (RFC-003)
7//!
8//! Every RMW against `ff_attempt` is wrapped in a BEGIN/COMMIT + a
9//! `SELECT ... FOR UPDATE` of the target attempt row. The claim cascade
10//! uses `FOR UPDATE SKIP LOCKED` on the exec_core + attempt join so two
11//! workers racing on the same lane cannot double-claim. Lease epoch is
12//! the authoritative fencing token: every terminal/progress/renew op
13//! first re-reads the attempt row under lock and compares
14//! `lease_epoch` against the handle's embedded epoch. Mismatch →
15//! `EngineError::Contention(LeaseConflict)`.
16//!
17//! # Isolation level (Q11)
18//!
19//! Default `READ COMMITTED`. The claim query uses `SKIP LOCKED` to
20//! keep the scanner contention-free; terminal ops use row-level
21//! `FOR UPDATE` on the already-identified attempt row. This is enough
22//! for the fence-triple invariant because the primary key on
23//! `ff_attempt` is `(partition_key, execution_id, attempt_index)` —
24//! the `FOR UPDATE` is partition-local and deterministic.
25//!
26//! # Isolation note on capability matching
27//!
28//! Capability subset-match runs in Rust *after* the FOR UPDATE SKIP
29//! LOCKED row is obtained. If the worker does not satisfy the row's
30//! required_capabilities, we `ROLLBACK` (releasing the lock) and
31//! return `Ok(None)` so another worker with the right caps can pick
32//! the row up. We do NOT try to re-scan within the same claim call
33//! — that would blow the blocking-wait budget in a pathological
34//! mismatch loop; the caller's retry cadence is the right scope.
35
36use ff_core::backend::{
37    BackendTag, CapabilitySet, ClaimPolicy, FailOutcome, FailureClass, FailureReason, Handle,
38    HandleKind, LeaseRenewal, ReclaimToken,
39};
40use ff_core::caps::{matches as caps_matches, CapabilityRequirement};
41use ff_core::engine_error::{ContentionKind, EngineError};
42use ff_core::handle_codec::{decode as decode_opaque, encode as encode_opaque, HandlePayload};
43use ff_core::types::{
44    AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, TimestampMs,
45};
46use sqlx::{PgPool, Row};
47use std::time::{SystemTime, UNIX_EPOCH};
48use uuid::Uuid;
49
50use crate::error::map_sqlx_error;
51use crate::lease_event;
52
53// ── helpers ─────────────────────────────────────────────────────────────
54
55fn now_ms() -> i64 {
56    i64::try_from(
57        SystemTime::now()
58            .duration_since(UNIX_EPOCH)
59            .map(|d| d.as_millis())
60            .unwrap_or(0),
61    )
62    .unwrap_or(i64::MAX)
63}
64
65/// Extract `(partition_index, uuid)` from an `ExecutionId` formatted
66/// `{fp:N}:<uuid>`.
67fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
68    let s = eid.as_str();
69    let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
70        kind: ff_core::engine_error::ValidationKind::InvalidInput,
71        detail: format!("execution_id missing `{{fp:` prefix: {s}"),
72    })?;
73    let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
74        kind: ff_core::engine_error::ValidationKind::InvalidInput,
75        detail: format!("execution_id missing `}}:`: {s}"),
76    })?;
77    let part: i16 = rest[..close]
78        .parse()
79        .map_err(|_| EngineError::Validation {
80            kind: ff_core::engine_error::ValidationKind::InvalidInput,
81            detail: format!("execution_id partition index not u16: {s}"),
82        })?;
83    let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
84        kind: ff_core::engine_error::ValidationKind::InvalidInput,
85        detail: format!("execution_id UUID invalid: {s}"),
86    })?;
87    Ok((part, uuid))
88}
89
90fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
91    if handle.backend != BackendTag::Postgres {
92        return Err(EngineError::Validation {
93            kind: ff_core::engine_error::ValidationKind::Corruption,
94            detail: format!(
95                "handle minted by {:?} passed to Postgres backend",
96                handle.backend
97            ),
98        });
99    }
100    let decoded = decode_opaque(&handle.opaque)?;
101    if decoded.tag != BackendTag::Postgres {
102        return Err(EngineError::Validation {
103            kind: ff_core::engine_error::ValidationKind::Corruption,
104            detail: format!("inner handle tag mismatch: {:?}", decoded.tag),
105        });
106    }
107    Ok(decoded.payload)
108}
109
110fn mint_handle(payload: HandlePayload, kind: HandleKind) -> Handle {
111    let op = encode_opaque(BackendTag::Postgres, &payload);
112    Handle::new(BackendTag::Postgres, kind, op)
113}
114
115// ── claim ───────────────────────────────────────────────────────────────
116
117pub(crate) async fn claim(
118    pool: &PgPool,
119    lane: &LaneId,
120    capabilities: &CapabilitySet,
121    policy: &ClaimPolicy,
122) -> Result<Option<Handle>, EngineError> {
123    // We scan each partition in random order. For Wave 4b we iterate
124    // partitions 0..256; a production path would use a sampled order +
125    // per-lane cache. Keeping the happy-path simple: the test fixture
126    // inserts into a known partition, so we scan all 256.
127    let total_partitions: i16 = 256;
128    for part in 0..total_partitions {
129        match try_claim_in_partition(pool, part, lane, capabilities, policy).await? {
130            Some(h) => return Ok(Some(h)),
131            None => continue,
132        }
133    }
134    Ok(None)
135}
136
137async fn try_claim_in_partition(
138    pool: &PgPool,
139    part: i16,
140    lane: &LaneId,
141    capabilities: &CapabilitySet,
142    policy: &ClaimPolicy,
143) -> Result<Option<Handle>, EngineError> {
144    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
145    // SELECT one eligible exec row in this partition/lane — FOR UPDATE
146    // SKIP LOCKED keeps contending workers from pile-ups.
147    let row = sqlx::query(
148        r#"
149        SELECT execution_id, required_capabilities, attempt_index
150          FROM ff_exec_core
151         WHERE partition_key = $1
152           AND lane_id = $2
153           AND lifecycle_phase = 'runnable'
154           AND eligibility_state = 'eligible_now'
155         ORDER BY priority DESC, created_at_ms ASC
156         FOR UPDATE SKIP LOCKED
157         LIMIT 1
158        "#,
159    )
160    .bind(part)
161    .bind(lane.as_str())
162    .fetch_optional(&mut *tx)
163    .await
164    .map_err(map_sqlx_error)?;
165
166    let Some(row) = row else {
167        // No candidate in this partition — release the tx.
168        tx.rollback().await.map_err(map_sqlx_error)?;
169        return Ok(None);
170    };
171
172    let exec_uuid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
173    let required_caps: Vec<String> = row
174        .try_get::<Vec<String>, _>("required_capabilities")
175        .map_err(map_sqlx_error)?;
176    let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
177    let req = CapabilityRequirement::new(required_caps);
178    if !caps_matches(&req, capabilities) {
179        // Release the exec row lock; skip.
180        tx.rollback().await.map_err(map_sqlx_error)?;
181        return Ok(None);
182    }
183
184    let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
185    let now = now_ms();
186    let lease_ttl_ms = i64::from(policy.lease_ttl_ms);
187    let expires = now.saturating_add(lease_ttl_ms);
188
189    // UPSERT the attempt row: fresh lease epoch 1 on first claim; on
190    // a retry attempt the attempt_index is new so the PK doesn't
191    // collide. We always INSERT ON CONFLICT DO UPDATE to be safe.
192    sqlx::query(
193        r#"
194        INSERT INTO ff_attempt (
195            partition_key, execution_id, attempt_index,
196            worker_id, worker_instance_id,
197            lease_epoch, lease_expires_at_ms, started_at_ms
198        ) VALUES ($1, $2, $3, $4, $5, 1, $6, $7)
199        ON CONFLICT (partition_key, execution_id, attempt_index)
200        DO UPDATE SET
201            worker_id = EXCLUDED.worker_id,
202            worker_instance_id = EXCLUDED.worker_instance_id,
203            lease_epoch = ff_attempt.lease_epoch + 1,
204            lease_expires_at_ms = EXCLUDED.lease_expires_at_ms,
205            started_at_ms = EXCLUDED.started_at_ms,
206            outcome = NULL
207        "#,
208    )
209    .bind(part)
210    .bind(exec_uuid)
211    .bind(attempt_index_i)
212    .bind(policy.worker_id.as_str())
213    .bind(policy.worker_instance_id.as_str())
214    .bind(expires)
215    .bind(now)
216    .execute(&mut *tx)
217    .await
218    .map_err(map_sqlx_error)?;
219
220    // Re-read the epoch we just wrote.
221    let epoch_row = sqlx::query(
222        r#"
223        SELECT lease_epoch FROM ff_attempt
224         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
225        "#,
226    )
227    .bind(part)
228    .bind(exec_uuid)
229    .bind(attempt_index_i)
230    .fetch_one(&mut *tx)
231    .await
232    .map_err(map_sqlx_error)?;
233    let epoch_i: i64 = epoch_row.try_get("lease_epoch").map_err(map_sqlx_error)?;
234
235    // Flip exec_core to active.
236    sqlx::query(
237        r#"
238        UPDATE ff_exec_core
239           SET lifecycle_phase = 'active',
240               ownership_state = 'leased',
241               eligibility_state = 'not_applicable',
242               attempt_state = 'running_attempt'
243         WHERE partition_key = $1 AND execution_id = $2
244        "#,
245    )
246    .bind(part)
247    .bind(exec_uuid)
248    .execute(&mut *tx)
249    .await
250    .map_err(map_sqlx_error)?;
251
252    // RFC-019 Stage B outbox: lease acquired.
253    lease_event::emit(
254        &mut tx,
255        part,
256        exec_uuid,
257        None,
258        lease_event::EVENT_ACQUIRED,
259        now,
260    )
261    .await?;
262
263    tx.commit().await.map_err(map_sqlx_error)?;
264
265    let exec_id = ExecutionId::parse(&format!("{{fp:{part}}}:{exec_uuid}")).map_err(|e| {
266        EngineError::Validation {
267            kind: ff_core::engine_error::ValidationKind::InvalidInput,
268            detail: format!("reassembling exec id: {e}"),
269        }
270    })?;
271    let payload = HandlePayload::new(
272        exec_id,
273        attempt_index,
274        AttemptId::new(),
275        LeaseId::new(),
276        LeaseEpoch(u64::try_from(epoch_i).unwrap_or(1)),
277        u64::from(policy.lease_ttl_ms),
278        lane.clone(),
279        policy.worker_instance_id.clone(),
280    );
281    Ok(Some(mint_handle(payload, HandleKind::Fresh)))
282}
283
284// ── claim_from_reclaim ──────────────────────────────────────────────────
285
286pub(crate) async fn claim_from_reclaim(
287    pool: &PgPool,
288    token: ReclaimToken,
289) -> Result<Option<Handle>, EngineError> {
290    let eid = &token.grant.execution_id;
291    let (part, exec_uuid) = split_exec_id(eid)?;
292    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
293    // Lock the attempt row.
294    let row = sqlx::query(
295        r#"
296        SELECT attempt_index, lease_epoch, lease_expires_at_ms
297          FROM ff_attempt
298         WHERE partition_key = $1 AND execution_id = $2
299         ORDER BY attempt_index DESC
300         FOR UPDATE
301         LIMIT 1
302        "#,
303    )
304    .bind(part)
305    .bind(exec_uuid)
306    .fetch_optional(&mut *tx)
307    .await
308    .map_err(map_sqlx_error)?;
309    let Some(row) = row else {
310        tx.rollback().await.map_err(map_sqlx_error)?;
311        return Err(EngineError::NotFound { entity: "attempt" });
312    };
313    let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
314    let current_epoch: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
315    let expires_at: Option<i64> = row
316        .try_get::<Option<i64>, _>("lease_expires_at_ms")
317        .map_err(map_sqlx_error)?;
318
319    // Live-lease check. A valid reclaim requires the prior lease
320    // to have expired (lease_expires_at_ms <= now) OR to be NULL
321    // (released).
322    let now = now_ms();
323    let live = matches!(expires_at, Some(exp) if exp > now);
324    if live {
325        tx.rollback().await.map_err(map_sqlx_error)?;
326        return Ok(None); // caller sees `None` — documented below is LeaseConflict; but per trait shape Ok(None) means "grant no longer available". Use Contention when semantics demand hard signal.
327    }
328
329    // Bump epoch + install new worker + fresh expiry.
330    let lease_ttl_ms = i64::from(token.lease_ttl_ms);
331    let new_expires = now.saturating_add(lease_ttl_ms);
332    sqlx::query(
333        r#"
334        UPDATE ff_attempt
335           SET worker_id = $1,
336               worker_instance_id = $2,
337               lease_epoch = lease_epoch + 1,
338               lease_expires_at_ms = $3,
339               started_at_ms = $4,
340               outcome = NULL
341         WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7
342        "#,
343    )
344    .bind(token.worker_id.as_str())
345    .bind(token.worker_instance_id.as_str())
346    .bind(new_expires)
347    .bind(now)
348    .bind(part)
349    .bind(exec_uuid)
350    .bind(attempt_index_i)
351    .execute(&mut *tx)
352    .await
353    .map_err(map_sqlx_error)?;
354
355    sqlx::query(
356        r#"
357        UPDATE ff_exec_core
358           SET lifecycle_phase = 'active',
359               ownership_state = 'leased',
360               eligibility_state = 'not_applicable',
361               attempt_state = 'running_attempt'
362         WHERE partition_key = $1 AND execution_id = $2
363        "#,
364    )
365    .bind(part)
366    .bind(exec_uuid)
367    .execute(&mut *tx)
368    .await
369    .map_err(map_sqlx_error)?;
370
371    // RFC-019 Stage B outbox: lease reclaimed.
372    lease_event::emit(
373        &mut tx,
374        part,
375        exec_uuid,
376        None,
377        lease_event::EVENT_RECLAIMED,
378        now,
379    )
380    .await?;
381
382    tx.commit().await.map_err(map_sqlx_error)?;
383
384    let new_epoch = current_epoch.saturating_add(1);
385    let payload = HandlePayload::new(
386        eid.clone(),
387        AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0)),
388        AttemptId::new(),
389        LeaseId::new(),
390        LeaseEpoch(u64::try_from(new_epoch).unwrap_or(0)),
391        u64::from(token.lease_ttl_ms),
392        token.grant.lane_id.clone(),
393        token.worker_instance_id.clone(),
394    );
395    Ok(Some(mint_handle(payload, HandleKind::Resumed)))
396}
397
398// ── fence check ─────────────────────────────────────────────────────────
399
400/// Re-read the attempt row under FOR UPDATE + validate the handle's
401/// `lease_epoch` matches. Returns the locked row's
402/// `(attempt_index, lease_expires_at_ms)` for callers that need them
403/// for post-update logic. Fence mismatch → `Contention(LeaseConflict)`.
404async fn fence_check<'c>(
405    tx: &mut sqlx::Transaction<'c, sqlx::Postgres>,
406    part: i16,
407    exec_uuid: Uuid,
408    attempt_index: i32,
409    expected_epoch: u64,
410) -> Result<(), EngineError> {
411    let row = sqlx::query(
412        r#"
413        SELECT lease_epoch FROM ff_attempt
414         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
415         FOR UPDATE
416        "#,
417    )
418    .bind(part)
419    .bind(exec_uuid)
420    .bind(attempt_index)
421    .fetch_optional(&mut **tx)
422    .await
423    .map_err(map_sqlx_error)?;
424    let Some(row) = row else {
425        return Err(EngineError::NotFound { entity: "attempt" });
426    };
427    let epoch_i: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
428    let observed = u64::try_from(epoch_i).unwrap_or(0);
429    if observed != expected_epoch {
430        return Err(EngineError::Contention(ContentionKind::LeaseConflict));
431    }
432    Ok(())
433}
434
435// ── renew ───────────────────────────────────────────────────────────────
436
437pub(crate) async fn renew(
438    pool: &PgPool,
439    handle: &Handle,
440) -> Result<LeaseRenewal, EngineError> {
441    let payload = decode_handle(handle)?;
442    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
443    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
444    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
445    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
446    let now = now_ms();
447    let new_expires = now.saturating_add(i64::try_from(payload.lease_ttl_ms).unwrap_or(0));
448    sqlx::query(
449        r#"
450        UPDATE ff_attempt
451           SET lease_expires_at_ms = $1
452         WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
453        "#,
454    )
455    .bind(new_expires)
456    .bind(part)
457    .bind(exec_uuid)
458    .bind(attempt_index)
459    .execute(&mut *tx)
460    .await
461    .map_err(map_sqlx_error)?;
462    // RFC-019 Stage B outbox: lease renewed.
463    lease_event::emit(
464        &mut tx,
465        part,
466        exec_uuid,
467        None,
468        lease_event::EVENT_RENEWED,
469        now,
470    )
471    .await?;
472    tx.commit().await.map_err(map_sqlx_error)?;
473    Ok(LeaseRenewal::new(
474        u64::try_from(new_expires).unwrap_or(0),
475        payload.lease_epoch.0,
476    ))
477}
478
479// ── progress ────────────────────────────────────────────────────────────
480
481pub(crate) async fn progress(
482    pool: &PgPool,
483    handle: &Handle,
484    percent: Option<u8>,
485    message: Option<String>,
486) -> Result<(), EngineError> {
487    let payload = decode_handle(handle)?;
488    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
489    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
490    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
491    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
492    // Stash progress on the exec_core raw_fields jsonb. raw_fields is
493    // the Wave-3 contract for "fields that don't yet have a typed
494    // column" — progress_pct + progress_message live here until a
495    // follow-up migration promotes them. This preserves the op's
496    // observable side effect (caller can read it back) without
497    // forking the schema.
498    let mut patch = serde_json::Map::new();
499    if let Some(pct) = percent {
500        patch.insert("progress_pct".into(), serde_json::Value::from(pct));
501    }
502    if let Some(msg) = message {
503        patch.insert("progress_message".into(), serde_json::Value::from(msg));
504    }
505    let patch_val = serde_json::Value::Object(patch);
506    sqlx::query(
507        r#"
508        UPDATE ff_exec_core
509           SET raw_fields = raw_fields || $1::jsonb
510         WHERE partition_key = $2 AND execution_id = $3
511        "#,
512    )
513    .bind(patch_val)
514    .bind(part)
515    .bind(exec_uuid)
516    .execute(&mut *tx)
517    .await
518    .map_err(map_sqlx_error)?;
519    tx.commit().await.map_err(map_sqlx_error)?;
520    Ok(())
521}
522
523// ── complete ────────────────────────────────────────────────────────────
524
525pub(crate) async fn complete(
526    pool: &PgPool,
527    handle: &Handle,
528    payload_bytes: Option<Vec<u8>>,
529) -> Result<(), EngineError> {
530    let payload = decode_handle(handle)?;
531    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
532    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
533    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
534    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
535    let now = now_ms();
536
537    sqlx::query(
538        r#"
539        UPDATE ff_attempt
540           SET terminal_at_ms = $1,
541               outcome = 'success',
542               lease_expires_at_ms = NULL
543         WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
544        "#,
545    )
546    .bind(now)
547    .bind(part)
548    .bind(exec_uuid)
549    .bind(attempt_index)
550    .execute(&mut *tx)
551    .await
552    .map_err(map_sqlx_error)?;
553
554    sqlx::query(
555        r#"
556        UPDATE ff_exec_core
557           SET lifecycle_phase = 'terminal',
558               ownership_state = 'unowned',
559               eligibility_state = 'not_applicable',
560               attempt_state = 'attempt_terminal',
561               terminal_at_ms = $1,
562               result = $2
563         WHERE partition_key = $3 AND execution_id = $4
564        "#,
565    )
566    .bind(now)
567    .bind(payload_bytes.as_deref())
568    .bind(part)
569    .bind(exec_uuid)
570    .execute(&mut *tx)
571    .await
572    .map_err(map_sqlx_error)?;
573
574    // Outbox: emit completion event → trigger fires pg_notify.
575    sqlx::query(
576        r#"
577        INSERT INTO ff_completion_event (
578            partition_key, execution_id, flow_id, outcome,
579            namespace, instance_tag, occurred_at_ms
580        )
581        SELECT partition_key, execution_id, flow_id, 'success',
582               NULL, NULL, $3
583          FROM ff_exec_core
584         WHERE partition_key = $1 AND execution_id = $2
585        "#,
586    )
587    .bind(part)
588    .bind(exec_uuid)
589    .bind(now)
590    .execute(&mut *tx)
591    .await
592    .map_err(map_sqlx_error)?;
593
594    // RFC-019 Stage B outbox: lease revoked (terminal success).
595    lease_event::emit(
596        &mut tx,
597        part,
598        exec_uuid,
599        None,
600        lease_event::EVENT_REVOKED,
601        now,
602    )
603    .await?;
604
605    tx.commit().await.map_err(map_sqlx_error)?;
606    Ok(())
607}
608
609// ── fail ────────────────────────────────────────────────────────────────
610
611pub(crate) async fn fail(
612    pool: &PgPool,
613    handle: &Handle,
614    reason: FailureReason,
615    classification: FailureClass,
616) -> Result<FailOutcome, EngineError> {
617    let payload = decode_handle(handle)?;
618    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
619    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
620    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
621    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
622    let now = now_ms();
623    let retryable = matches!(
624        classification,
625        FailureClass::Transient | FailureClass::InfraCrash
626    );
627
628    if retryable {
629        // Retry: re-enqueue the exec, release lease, bump attempt_index.
630        sqlx::query(
631            r#"
632            UPDATE ff_attempt
633               SET terminal_at_ms = $1,
634                   outcome = 'retry',
635                   lease_expires_at_ms = NULL
636             WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
637            "#,
638        )
639        .bind(now)
640        .bind(part)
641        .bind(exec_uuid)
642        .bind(attempt_index)
643        .execute(&mut *tx)
644        .await
645        .map_err(map_sqlx_error)?;
646
647        sqlx::query(
648            r#"
649            UPDATE ff_exec_core
650               SET lifecycle_phase = 'runnable',
651                   ownership_state = 'unowned',
652                   eligibility_state = 'eligible_now',
653                   attempt_state = 'pending_retry_attempt',
654                   attempt_index = attempt_index + 1,
655                   raw_fields = raw_fields || jsonb_build_object('last_failure_message', $1::text)
656             WHERE partition_key = $2 AND execution_id = $3
657            "#,
658        )
659        .bind(&reason.message)
660        .bind(part)
661        .bind(exec_uuid)
662        .execute(&mut *tx)
663        .await
664        .map_err(map_sqlx_error)?;
665
666        // RFC-019 Stage B outbox: lease revoked (retry scheduled).
667        lease_event::emit(
668            &mut tx,
669            part,
670            exec_uuid,
671            None,
672            lease_event::EVENT_REVOKED,
673            now,
674        )
675        .await?;
676
677        tx.commit().await.map_err(map_sqlx_error)?;
678        Ok(FailOutcome::RetryScheduled {
679            delay_until: TimestampMs::from_millis(now),
680        })
681    } else {
682        // Terminal-failed.
683        sqlx::query(
684            r#"
685            UPDATE ff_attempt
686               SET terminal_at_ms = $1,
687                   outcome = 'failed',
688                   lease_expires_at_ms = NULL
689             WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
690            "#,
691        )
692        .bind(now)
693        .bind(part)
694        .bind(exec_uuid)
695        .bind(attempt_index)
696        .execute(&mut *tx)
697        .await
698        .map_err(map_sqlx_error)?;
699
700        sqlx::query(
701            r#"
702            UPDATE ff_exec_core
703               SET lifecycle_phase = 'terminal',
704                   ownership_state = 'unowned',
705                   eligibility_state = 'not_applicable',
706                   attempt_state = 'attempt_terminal',
707                   terminal_at_ms = $1,
708                   raw_fields = raw_fields || jsonb_build_object('last_failure_message', $2::text)
709             WHERE partition_key = $3 AND execution_id = $4
710            "#,
711        )
712        .bind(now)
713        .bind(&reason.message)
714        .bind(part)
715        .bind(exec_uuid)
716        .execute(&mut *tx)
717        .await
718        .map_err(map_sqlx_error)?;
719
720        sqlx::query(
721            r#"
722            INSERT INTO ff_completion_event (
723                partition_key, execution_id, flow_id, outcome,
724                namespace, instance_tag, occurred_at_ms
725            )
726            SELECT partition_key, execution_id, flow_id, 'failed',
727                   NULL, NULL, $3
728              FROM ff_exec_core
729             WHERE partition_key = $1 AND execution_id = $2
730            "#,
731        )
732        .bind(part)
733        .bind(exec_uuid)
734        .bind(now)
735        .execute(&mut *tx)
736        .await
737        .map_err(map_sqlx_error)?;
738
739        // RFC-019 Stage B outbox: lease revoked (terminal fail).
740        lease_event::emit(
741            &mut tx,
742            part,
743            exec_uuid,
744            None,
745            lease_event::EVENT_REVOKED,
746            now,
747        )
748        .await?;
749
750        tx.commit().await.map_err(map_sqlx_error)?;
751        Ok(FailOutcome::TerminalFailed)
752    }
753}
754
755// ── delay ───────────────────────────────────────────────────────────────
756
757pub(crate) async fn delay(
758    pool: &PgPool,
759    handle: &Handle,
760    delay_until: TimestampMs,
761) -> Result<(), EngineError> {
762    let payload = decode_handle(handle)?;
763    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
764    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
765    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
766    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
767
768    sqlx::query(
769        r#"
770        UPDATE ff_attempt
771           SET outcome = 'interrupted',
772               lease_expires_at_ms = NULL
773         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
774        "#,
775    )
776    .bind(part)
777    .bind(exec_uuid)
778    .bind(attempt_index)
779    .execute(&mut *tx)
780    .await
781    .map_err(map_sqlx_error)?;
782
783    sqlx::query(
784        r#"
785        UPDATE ff_exec_core
786           SET lifecycle_phase = 'runnable',
787               ownership_state = 'unowned',
788               eligibility_state = 'not_eligible_until_time',
789               attempt_state = 'attempt_interrupted',
790               deadline_at_ms = $1
791         WHERE partition_key = $2 AND execution_id = $3
792        "#,
793    )
794    .bind(delay_until.0)
795    .bind(part)
796    .bind(exec_uuid)
797    .execute(&mut *tx)
798    .await
799    .map_err(map_sqlx_error)?;
800
801    // RFC-019 Stage B outbox: lease revoked (delay).
802    lease_event::emit(
803        &mut tx,
804        part,
805        exec_uuid,
806        None,
807        lease_event::EVENT_REVOKED,
808        now_ms(),
809    )
810    .await?;
811
812    tx.commit().await.map_err(map_sqlx_error)?;
813    Ok(())
814}
815
816// ── wait_children ───────────────────────────────────────────────────────
817
818pub(crate) async fn wait_children(
819    pool: &PgPool,
820    handle: &Handle,
821) -> Result<(), EngineError> {
822    let payload = decode_handle(handle)?;
823    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
824    let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
825    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
826    fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
827
828    sqlx::query(
829        r#"
830        UPDATE ff_attempt
831           SET outcome = 'waiting_children',
832               lease_expires_at_ms = NULL
833         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
834        "#,
835    )
836    .bind(part)
837    .bind(exec_uuid)
838    .bind(attempt_index)
839    .execute(&mut *tx)
840    .await
841    .map_err(map_sqlx_error)?;
842
843    sqlx::query(
844        r#"
845        UPDATE ff_exec_core
846           SET lifecycle_phase = 'runnable',
847               ownership_state = 'unowned',
848               eligibility_state = 'blocked_by_dependencies',
849               attempt_state = 'attempt_interrupted',
850               blocking_reason = 'waiting_for_children'
851         WHERE partition_key = $1 AND execution_id = $2
852        "#,
853    )
854    .bind(part)
855    .bind(exec_uuid)
856    .execute(&mut *tx)
857    .await
858    .map_err(map_sqlx_error)?;
859
860    // RFC-019 Stage B outbox: lease revoked (wait_children).
861    lease_event::emit(
862        &mut tx,
863        part,
864        exec_uuid,
865        None,
866        lease_event::EVENT_REVOKED,
867        now_ms(),
868    )
869    .await?;
870
871    tx.commit().await.map_err(map_sqlx_error)?;
872    Ok(())
873}
874