Skip to main content

ff_backend_postgres/
operator.rs

1//! RFC-020 Wave 9 Spine-A pt.1 — operator-control mutating methods
2//! (`cancel_execution` + `revoke_lease`).
3//!
4//! Both methods follow the §4.2 shared spine template:
5//!
6//!   1. `BEGIN ISOLATION LEVEL SERIALIZABLE`.
7//!   2. `SELECT ... FOR UPDATE` on `ff_exec_core` (+ `ff_attempt`
8//!      for the current-attempt row) — captures pre-state and
9//!      acquires row locks against the `lease_expiry` /
10//!      `attempt_timeout` reconcilers (§4.2.6).
11//!   3. Validate against Valkey-canonical semantics (§5.2).
12//!   4. Mutate with compare-and-set fencing (lease_epoch CAS on
13//!      `revoke_lease`; lifecycle_phase check on `cancel_execution`).
14//!   5. Emit the outbox row per §4.2.7 matrix (`ff_lease_event`).
15//!   6. `COMMIT`. On `40001` / `40P01` (serialization / deadlock)
16//!      retry up to `CANCEL_FLOW_MAX_ATTEMPTS = 3`; exhaustion
17//!      surfaces `ContentionKind::RetryExhausted` to the caller.
18//!
19//! Per §4.2.6: existing reconcilers (`lease_expiry`, `attempt_timeout`)
20//! filter on `lifecycle_phase = 'active'`, so a `'cancelled'` row
21//! surfaced by `cancel_execution` is silently skipped — no new
22//! reconciler modifications required.
23
24use std::time::Duration;
25
26use ff_core::contracts::{
27    CancelExecutionArgs, CancelExecutionResult, CancelFlowArgs, CancelFlowHeader,
28    ChangePriorityArgs, ChangePriorityResult, ReplayExecutionArgs, ReplayExecutionResult,
29    RevokeLeaseArgs, RevokeLeaseResult,
30};
31use ff_core::engine_error::{ContentionKind, EngineError, StateKind, ValidationKind};
32use ff_core::state::PublicState;
33use ff_core::types::{CancelSource, ExecutionId, FlowId};
34use serde_json::json;
35use sqlx::{PgPool, Postgres, Row};
36use uuid::Uuid;
37
38use crate::error::map_sqlx_error;
39use crate::{lease_event, operator_event};
40
41/// Max attempts on `40001` / `40P01`. Matches
42/// [`crate::flow::CANCEL_FLOW_MAX_ATTEMPTS`] (RFC §4.2).
43const MAX_ATTEMPTS: u32 = 3;
44
45/// Extract the raw UUID suffix from an `ExecutionId`'s wire form
46/// (`"{fp:N}:<uuid>"`). Mirrors [`crate::exec_core::eid_uuid`].
47fn eid_uuid(eid: &ff_core::types::ExecutionId) -> Uuid {
48    let s = eid.as_str();
49    let suffix = s
50        .split_once("}:")
51        .map(|(_, u)| u)
52        .expect("ExecutionId has `}:` separator (invariant)");
53    Uuid::parse_str(suffix).expect("ExecutionId suffix is a valid UUID (invariant)")
54}
55
56fn now_ms() -> i64 {
57    let d = std::time::SystemTime::now()
58        .duration_since(std::time::UNIX_EPOCH)
59        .expect("clock is after UNIX_EPOCH");
60    (d.as_millis() as i64).max(0)
61}
62
63/// Map a `40001` / `40P01` sqlx conflict into the retry loop. Other
64/// `Contention` variants propagate untouched (mirrors
65/// `flow::is_serialization_conflict`).
66fn is_serialization_conflict(err: &EngineError) -> bool {
67    matches!(err, EngineError::Contention(ContentionKind::LeaseConflict))
68}
69
70async fn begin_serializable(pool: &PgPool) -> Result<sqlx::Transaction<'_, Postgres>, EngineError> {
71    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
72    sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
73        .execute(&mut *tx)
74        .await
75        .map_err(map_sqlx_error)?;
76    Ok(tx)
77}
78
79/// Synthetic Postgres lease identity (the backend has no stable
80/// `lease_id` column — §4.2.2 derives identity from
81/// `(execution_id, attempt_index, lease_epoch)`). Surfaces on
82/// [`RevokeLeaseResult::Revoked`] so callers have a stable string to
83/// round-trip through logs / traces.
84fn synthetic_lease_id(exec_uuid: Uuid, attempt_index: i32, lease_epoch: i64) -> String {
85    format!("pg:{exec_uuid}:{attempt_index}:{lease_epoch}")
86}
87
88// ─── cancel_execution (§4.2.1) ────────────────────────────────────
89
90/// Transactional body for [`cancel_execution_impl`].
91///
92/// Returns `Ok(result)` on success (one tx); `Err(e)` for
93/// serialization retries (40001/40P01 → re-enters the outer loop) and
94/// hard failures (NotFound, StaleLease, Validation on terminal
95/// conflict).
96async fn cancel_execution_once(
97    pool: &PgPool,
98    args: &CancelExecutionArgs,
99) -> Result<CancelExecutionResult, EngineError> {
100    let partition_key: i16 = args.execution_id.partition() as i16;
101    let exec_uuid = eid_uuid(&args.execution_id);
102    // Honour caller-supplied `args.now` for `terminal_at_ms` +
103    // `raw_fields.last_mutation_at` + outbox `occurred_at_ms` so
104    // retries + cross-backend comparisons don't drift on the DB host
105    // clock.
106    let now: i64 = args.now.0;
107
108    let mut tx = begin_serializable(pool).await?;
109
110    // Step 2 + 3 — pre-read under lock. Join on the current attempt
111    // row to read `worker_instance_id` + `lease_epoch` (drives the
112    // lease-active decision + CAS fencing).
113    let row = sqlx::query(
114        r#"
115        SELECT ec.lifecycle_phase,
116               ec.public_state,
117               ec.attempt_index,
118               a.worker_instance_id,
119               a.lease_epoch
120          FROM ff_exec_core ec
121          LEFT JOIN ff_attempt a
122            ON a.partition_key = ec.partition_key
123           AND a.execution_id  = ec.execution_id
124           AND a.attempt_index = ec.attempt_index
125         WHERE ec.partition_key = $1 AND ec.execution_id = $2
126         FOR NO KEY UPDATE OF ec
127        "#,
128    )
129    .bind(partition_key)
130    .bind(exec_uuid)
131    .fetch_optional(&mut *tx)
132    .await
133    .map_err(map_sqlx_error)?;
134
135    let Some(row) = row else {
136        tx.rollback().await.map_err(map_sqlx_error)?;
137        // Valkey path returns `execution_not_found` → NotFound
138        // (`ScriptError::ExecutionNotFound`). Mirror that here.
139        return Err(EngineError::NotFound {
140            entity: "execution",
141        });
142    };
143
144    let lifecycle_phase: String = row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
145    let public_state: String = row.try_get("public_state").map_err(map_sqlx_error)?;
146    let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
147    let worker_instance_id: Option<String> =
148        row.try_get("worker_instance_id").map_err(map_sqlx_error)?;
149    let lease_epoch: Option<i64> = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
150
151    // Terminal-state handling — idempotent replay if already
152    // cancelled; hard conflict if terminal with a different outcome.
153    // Mirrors `exec_core::cancel_impl` to keep operator-visible
154    // semantics stable.
155    if matches!(lifecycle_phase.as_str(), "terminal" | "cancelled") {
156        tx.rollback().await.map_err(map_sqlx_error)?;
157        return if public_state == "cancelled" {
158            Ok(CancelExecutionResult::Cancelled {
159                execution_id: args.execution_id.clone(),
160                public_state: PublicState::Cancelled,
161            })
162        } else {
163            Err(EngineError::Validation {
164                kind: ValidationKind::InvalidInput,
165                detail: format!(
166                    "cancel_execution: execution_id={}: already terminal in state '{}'",
167                    args.execution_id, public_state
168                ),
169            })
170        };
171    }
172
173    // Lease fence validation (Valkey parity — only enforced when
174    // `source != CancelSource::OperatorOverride`). Per
175    // `CancelExecutionArgs` docstring, `lease_epoch` is REQUIRED
176    // when source is not operator_override AND the execution is
177    // active; missing input on that path surfaces a `Validation`
178    // error rather than silently bypassing. The Valkey side keys
179    // fence on `current_lease_id` + `current_lease_epoch`; Postgres
180    // has no stable `lease_id` column, so we fence on `lease_epoch`.
181    // `lease_id` in args is ignored on Postgres (documented parity
182    // gap — matches the `lease_event::emit` docs for `lease_id = None`).
183    let lease_active = worker_instance_id
184        .as_deref()
185        .is_some_and(|s| !s.is_empty());
186    if !matches!(args.source, CancelSource::OperatorOverride) && lease_active {
187        let Some(expected_epoch) = args.lease_epoch.as_ref() else {
188            tx.rollback().await.map_err(map_sqlx_error)?;
189            return Err(EngineError::Validation {
190                kind: ValidationKind::InvalidInput,
191                detail: format!(
192                    "cancel_execution: execution_id={}: lease_epoch required when source != operator_override and execution is active",
193                    args.execution_id
194                ),
195            });
196        };
197        let expected: i64 = i64::try_from(expected_epoch.0).unwrap_or(i64::MAX);
198        if lease_epoch.unwrap_or(0) != expected {
199            tx.rollback().await.map_err(map_sqlx_error)?;
200            return Err(EngineError::State(
201                ff_core::engine_error::StateKind::StaleLease,
202            ));
203        }
204    }
205
206    // Step 4a — flip exec_core to cancelled. Matches the existing
207    // `flow::cancel_flow` member-cancel shape (flow.rs:672) so the
208    // terminal lifecycle literal stays `'cancelled'` (not `'terminal'`
209    // + `terminal_outcome='cancelled'` — the Valkey encoding). Per
210    // RFC §4.2.1 this is the Postgres-canonical encoding.
211    sqlx::query(
212        r#"
213        UPDATE ff_exec_core
214           SET lifecycle_phase     = 'cancelled',
215               ownership_state     = 'unowned',
216               eligibility_state   = 'not_applicable',
217               public_state        = 'cancelled',
218               attempt_state       = 'cancelled',
219               terminal_at_ms      = COALESCE(terminal_at_ms, $3),
220               cancellation_reason = COALESCE(cancellation_reason, $4),
221               cancelled_by        = COALESCE(cancelled_by, $5),
222               raw_fields          = jsonb_set(raw_fields,
223                                               '{last_mutation_at}',
224                                               to_jsonb($3::text))
225         WHERE partition_key = $1 AND execution_id = $2
226        "#,
227    )
228    .bind(partition_key)
229    .bind(exec_uuid)
230    .bind(now)
231    .bind(&args.reason)
232    .bind(args.source.as_str())
233    .execute(&mut *tx)
234    .await
235    .map_err(map_sqlx_error)?;
236
237    // Step 4b — clear lease on the current attempt row if one was
238    // active. Zeroes `worker_instance_id` + `lease_expires_at_ms` so
239    // the `lease_expiry` reconciler + `claim_from_reclaim` path see a
240    // released slot. `lease_epoch` bumps by 1 to fence any in-flight
241    // RMW (same pattern as `revoke_lease`).
242    if lease_active {
243        sqlx::query(
244            r#"
245            UPDATE ff_attempt
246               SET worker_instance_id   = NULL,
247                   lease_expires_at_ms  = NULL,
248                   lease_epoch          = lease_epoch + 1,
249                   terminal_at_ms       = $4,
250                   outcome              = 'cancelled'
251             WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
252            "#,
253        )
254        .bind(partition_key)
255        .bind(exec_uuid)
256        .bind(attempt_index)
257        .bind(now)
258        .execute(&mut *tx)
259        .await
260        .map_err(map_sqlx_error)?;
261
262        // Step 5 — outbox emit (§4.2.7). Only when a lease was
263        // active; an already-unowned exec generates no lease event
264        // (matches Valkey's `ff_cancel_execution` which XADDs a
265        // `released` history event only on the active path).
266        lease_event::emit(
267            &mut tx,
268            partition_key,
269            exec_uuid,
270            None, // Postgres has no stable lease_id — see lease_event::emit docs
271            lease_event::EVENT_REVOKED,
272            now,
273        )
274        .await?;
275    }
276
277    tx.commit().await.map_err(map_sqlx_error)?;
278    Ok(CancelExecutionResult::Cancelled {
279        execution_id: args.execution_id.clone(),
280        public_state: PublicState::Cancelled,
281    })
282}
283
284/// `EngineBackend::cancel_execution` impl — SERIALIZABLE + 3-attempt
285/// retry loop on 40001 / 40P01.
286pub(super) async fn cancel_execution_impl(
287    pool: &PgPool,
288    args: CancelExecutionArgs,
289) -> Result<CancelExecutionResult, EngineError> {
290    let mut last: Option<EngineError> = None;
291    for attempt in 0..MAX_ATTEMPTS {
292        match cancel_execution_once(pool, &args).await {
293            Ok(r) => return Ok(r),
294            Err(err) => {
295                if is_serialization_conflict(&err) {
296                    if attempt + 1 < MAX_ATTEMPTS {
297                        let ms = 5u64 * (1u64 << attempt);
298                        tokio::time::sleep(Duration::from_millis(ms)).await;
299                    }
300                    last = Some(err);
301                    continue;
302                }
303                return Err(err);
304            }
305        }
306    }
307    let _ = last;
308    Err(EngineError::Contention(ContentionKind::RetryExhausted))
309}
310
311// ─── revoke_lease (§4.2.2) ────────────────────────────────────────
312
313async fn revoke_lease_once(
314    pool: &PgPool,
315    args: &RevokeLeaseArgs,
316) -> Result<RevokeLeaseResult, EngineError> {
317    let partition_key: i16 = args.execution_id.partition() as i16;
318    let exec_uuid = eid_uuid(&args.execution_id);
319    let now = now_ms();
320
321    let mut tx = begin_serializable(pool).await?;
322
323    // Pre-read: `ff_exec_core` under UPDATE lock to pin the current
324    // attempt_index (defends against a concurrent `replay_execution`
325    // that would bump it). `FOR NO KEY UPDATE` is unsupported on
326    // nullable outer-join sides (pg 0A000), so we lock `ec` solo and
327    // re-read the attempt row separately under its own `FOR UPDATE`.
328    let ec_row = sqlx::query(
329        r#"
330        SELECT attempt_index
331          FROM ff_exec_core
332         WHERE partition_key = $1 AND execution_id = $2
333         FOR NO KEY UPDATE
334        "#,
335    )
336    .bind(partition_key)
337    .bind(exec_uuid)
338    .fetch_optional(&mut *tx)
339    .await
340    .map_err(map_sqlx_error)?;
341
342    let Some(ec_row) = ec_row else {
343        tx.rollback().await.map_err(map_sqlx_error)?;
344        return Err(EngineError::NotFound {
345            entity: "execution",
346        });
347    };
348    let attempt_index: i32 = ec_row.try_get("attempt_index").map_err(map_sqlx_error)?;
349
350    // Attempt row may not exist yet (pre-claim exec). Lock with
351    // FOR UPDATE so the `lease_expiry` reconciler serializes with us.
352    let att_row = sqlx::query(
353        r#"
354        SELECT worker_instance_id, lease_epoch
355          FROM ff_attempt
356         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
357         FOR UPDATE
358        "#,
359    )
360    .bind(partition_key)
361    .bind(exec_uuid)
362    .bind(attempt_index)
363    .fetch_optional(&mut *tx)
364    .await
365    .map_err(map_sqlx_error)?;
366
367    let (worker_instance_id, lease_epoch): (Option<String>, Option<i64>) = match att_row {
368        Some(r) => (
369            r.try_get("worker_instance_id").map_err(map_sqlx_error)?,
370            r.try_get("lease_epoch").map_err(map_sqlx_error)?,
371        ),
372        None => (None, None),
373    };
374
375    let lease_active = worker_instance_id
376        .as_deref()
377        .is_some_and(|s| !s.is_empty());
378    if !lease_active {
379        // Valkey parity — `RevokeLeaseResult::AlreadySatisfied` when
380        // the exec has no active lease (§4.2.2 +
381        // `valkey/lib.rs:5881-5892`).
382        tx.rollback().await.map_err(map_sqlx_error)?;
383        return Ok(RevokeLeaseResult::AlreadySatisfied {
384            reason: "no_active_lease".to_owned(),
385        });
386    }
387
388    // Targeted-revoke fence: the caller identifies a specific owner
389    // via `args.worker_instance_id`. If the locked attempt is held
390    // by a different worker, surface `AlreadySatisfied` (idempotent
391    // parity — the targeted lease is already gone, from this
392    // caller's perspective). An empty caller-supplied wiid is a
393    // wildcard (matches Valkey's `revoke_lease: HGET
394    // current_worker_instance_id` fallback at `lib.rs:5869-5899`).
395    let caller_wiid = args.worker_instance_id.as_str();
396    if !caller_wiid.is_empty()
397        && worker_instance_id.as_deref() != Some(caller_wiid)
398    {
399        tx.rollback().await.map_err(map_sqlx_error)?;
400        return Ok(RevokeLeaseResult::AlreadySatisfied {
401            reason: "different_worker_instance_id".to_owned(),
402        });
403    }
404
405    let prior_epoch = lease_epoch.unwrap_or(0);
406
407    // Optional `expected_lease_id` fence. Valkey uses a stable
408    // lease_id; Postgres synthesises one from
409    // `(execution_id, attempt_index, lease_epoch)`. Empty string is
410    // the documented "skip check" path on the args docstring.
411    if let Some(expected) = args
412        .expected_lease_id
413        .as_ref()
414        .filter(|s| !s.is_empty())
415    {
416        let current_id = synthetic_lease_id(exec_uuid, attempt_index, prior_epoch);
417        if expected != &current_id {
418            tx.rollback().await.map_err(map_sqlx_error)?;
419            return Ok(RevokeLeaseResult::AlreadySatisfied {
420                reason: "lease_id_mismatch".to_owned(),
421            });
422        }
423    }
424
425    // CAS on `lease_epoch`. Row-count = 0 → another writer bumped
426    // the epoch (reconciler or concurrent revoke) → `AlreadySatisfied`
427    // (§4.2.2 `reason: "epoch_moved"`).
428    let affected = sqlx::query(
429        r#"
430        UPDATE ff_attempt
431           SET worker_instance_id   = NULL,
432               lease_expires_at_ms  = NULL,
433               lease_epoch          = lease_epoch + 1
434         WHERE partition_key = $1
435           AND execution_id  = $2
436           AND attempt_index = $3
437           AND lease_epoch   = $4
438        "#,
439    )
440    .bind(partition_key)
441    .bind(exec_uuid)
442    .bind(attempt_index)
443    .bind(prior_epoch)
444    .execute(&mut *tx)
445    .await
446    .map_err(map_sqlx_error)?
447    .rows_affected();
448
449    if affected == 0 {
450        tx.rollback().await.map_err(map_sqlx_error)?;
451        return Ok(RevokeLeaseResult::AlreadySatisfied {
452            reason: "epoch_moved".to_owned(),
453        });
454    }
455
456    // Flip `ff_exec_core` back to reclaimable-runnable so the normal
457    // claim path picks it up — mirrors
458    // `reconcilers::lease_expiry::release_one` (lease_expiry.rs:156-173).
459    // Gated on `lifecycle_phase = 'active'` to avoid touching a row
460    // that's been concurrently terminated (cancelled / completed).
461    sqlx::query(
462        r#"
463        UPDATE ff_exec_core
464           SET lifecycle_phase   = 'runnable',
465               ownership_state   = 'unowned',
466               eligibility_state = 'eligible_now',
467               attempt_state     = 'attempt_interrupted',
468               raw_fields        = jsonb_set(raw_fields,
469                                             '{last_mutation_at}',
470                                             to_jsonb($3::text))
471         WHERE partition_key = $1 AND execution_id = $2
472           AND lifecycle_phase = 'active'
473        "#,
474    )
475    .bind(partition_key)
476    .bind(exec_uuid)
477    .bind(now)
478    .execute(&mut *tx)
479    .await
480    .map_err(map_sqlx_error)?;
481
482    // Step 5 — outbox emit (§4.2.7 — `ff_lease_event event_type=revoked`).
483    lease_event::emit(
484        &mut tx,
485        partition_key,
486        exec_uuid,
487        None,
488        lease_event::EVENT_REVOKED,
489        now,
490    )
491    .await?;
492
493    tx.commit().await.map_err(map_sqlx_error)?;
494
495    Ok(RevokeLeaseResult::Revoked {
496        lease_id: synthetic_lease_id(exec_uuid, attempt_index, prior_epoch),
497        lease_epoch: (prior_epoch + 1).to_string(),
498    })
499}
500
501/// `EngineBackend::revoke_lease` impl — SERIALIZABLE + 3-attempt
502/// retry loop on 40001 / 40P01.
503pub(super) async fn revoke_lease_impl(
504    pool: &PgPool,
505    args: RevokeLeaseArgs,
506) -> Result<RevokeLeaseResult, EngineError> {
507    let mut last: Option<EngineError> = None;
508    for attempt in 0..MAX_ATTEMPTS {
509        match revoke_lease_once(pool, &args).await {
510            Ok(r) => return Ok(r),
511            Err(err) => {
512                if is_serialization_conflict(&err) {
513                    if attempt + 1 < MAX_ATTEMPTS {
514                        let ms = 5u64 * (1u64 << attempt);
515                        tokio::time::sleep(Duration::from_millis(ms)).await;
516                    }
517                    last = Some(err);
518                    continue;
519                }
520                return Err(err);
521            }
522        }
523    }
524    let _ = last;
525    Err(EngineError::Contention(ContentionKind::RetryExhausted))
526}
527
528// ─── change_priority (§4.2.4, Rev 7 Fork 3) ────────────────────────
529
530/// Shared retry wrapper — SERIALIZABLE + 3-attempt retry on 40001 /
531/// 40P01 (§4.2 template).
532async fn retry_serializable<F, Fut, T>(mut f: F) -> Result<T, EngineError>
533where
534    F: FnMut() -> Fut,
535    Fut: std::future::Future<Output = Result<T, EngineError>>,
536{
537    let mut last: Option<EngineError> = None;
538    for attempt in 0..MAX_ATTEMPTS {
539        match f().await {
540            Ok(v) => return Ok(v),
541            Err(err) => {
542                if is_serialization_conflict(&err) {
543                    if attempt + 1 < MAX_ATTEMPTS {
544                        let ms = 5u64 * (1u64 << attempt);
545                        tokio::time::sleep(Duration::from_millis(ms)).await;
546                    }
547                    last = Some(err);
548                    continue;
549                }
550                return Err(err);
551            }
552        }
553    }
554    let _ = last;
555    Err(EngineError::Contention(ContentionKind::RetryExhausted))
556}
557
558async fn change_priority_once(
559    pool: &PgPool,
560    args: &ChangePriorityArgs,
561) -> Result<ChangePriorityResult, EngineError> {
562    let partition_key: i16 = args.execution_id.partition() as i16;
563    let exec_uuid = eid_uuid(&args.execution_id);
564    let now: i64 = args.now.0;
565
566    let mut tx = begin_serializable(pool).await?;
567
568    // Pre-read under NO KEY UPDATE lock. Gate fields mirror the Valkey
569    // Lua check (`flowfabric.lua:3683-3688`): lifecycle_phase = 'runnable'
570    // AND eligibility_state = 'eligible_now' — any other state surfaces
571    // `execution_not_eligible` (Rev 7 Fork 3, Option C).
572    let row = sqlx::query(
573        r#"
574        SELECT lifecycle_phase, eligibility_state, priority
575          FROM ff_exec_core
576         WHERE partition_key = $1 AND execution_id = $2
577         FOR NO KEY UPDATE
578        "#,
579    )
580    .bind(partition_key)
581    .bind(exec_uuid)
582    .fetch_optional(&mut *tx)
583    .await
584    .map_err(map_sqlx_error)?;
585
586    let Some(row) = row else {
587        tx.rollback().await.map_err(map_sqlx_error)?;
588        return Err(EngineError::NotFound {
589            entity: "execution",
590        });
591    };
592
593    let lifecycle_phase: String = row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
594    let eligibility_state: String = row.try_get("eligibility_state").map_err(map_sqlx_error)?;
595    let old_priority: i32 = row.try_get("priority").map_err(map_sqlx_error)?;
596
597    // Valkey-canonical gate (`flowfabric.lua:3683-3688`). Both fails-
598    // closed with `execution_not_eligible`.
599    if lifecycle_phase != "runnable" || eligibility_state != "eligible_now" {
600        tx.rollback().await.map_err(map_sqlx_error)?;
601        return Err(EngineError::Contention(
602            ContentionKind::ExecutionNotEligible,
603        ));
604    }
605
606    // Clamp new_priority to [0, 9000] matching Valkey's
607    // `ff_change_priority` (flowfabric.lua:3695-3696 — "same as
608    // ff_create_execution").
609    let new_priority = args.new_priority.clamp(0, 9000);
610
611    // Mutate. Repeat the gate in the WHERE clause as belt-and-
612    // suspenders; row-count = 0 on concurrent transition between
613    // the pre-read and UPDATE surfaces the same `execution_not_eligible`
614    // error.
615    let affected = sqlx::query(
616        r#"
617        UPDATE ff_exec_core
618           SET priority   = $3,
619               raw_fields = jsonb_set(raw_fields,
620                                      '{last_mutation_at}',
621                                      to_jsonb($4::text))
622         WHERE partition_key = $1 AND execution_id = $2
623           AND lifecycle_phase   = 'runnable'
624           AND eligibility_state = 'eligible_now'
625        "#,
626    )
627    .bind(partition_key)
628    .bind(exec_uuid)
629    .bind(new_priority)
630    .bind(now)
631    .execute(&mut *tx)
632    .await
633    .map_err(map_sqlx_error)?
634    .rows_affected();
635
636    if affected == 0 {
637        tx.rollback().await.map_err(map_sqlx_error)?;
638        return Err(EngineError::Contention(
639            ContentionKind::ExecutionNotEligible,
640        ));
641    }
642
643    // Outbox emit (§4.2.7 — ff_operator_event event_type='priority_changed').
644    operator_event::emit(
645        &mut tx,
646        partition_key,
647        exec_uuid,
648        operator_event::EVENT_PRIORITY_CHANGED,
649        json!({
650            "old_priority": old_priority,
651            "new_priority": new_priority,
652        }),
653        now,
654    )
655    .await?;
656
657    tx.commit().await.map_err(map_sqlx_error)?;
658
659    Ok(ChangePriorityResult::Changed {
660        execution_id: args.execution_id.clone(),
661    })
662}
663
664pub(super) async fn change_priority_impl(
665    pool: &PgPool,
666    args: ChangePriorityArgs,
667) -> Result<ChangePriorityResult, EngineError> {
668    retry_serializable(|| change_priority_once(pool, &args)).await
669}
670
671// ─── replay_execution (§4.2.5, Rev 7 Forks 1 + 2) ──────────────────
672
673async fn replay_execution_once(
674    pool: &PgPool,
675    args: &ReplayExecutionArgs,
676) -> Result<ReplayExecutionResult, EngineError> {
677    let partition_key: i16 = args.execution_id.partition() as i16;
678    let exec_uuid = eid_uuid(&args.execution_id);
679    let now: i64 = args.now.0;
680
681    let mut tx = begin_serializable(pool).await?;
682
683    // Pre-read under lock. Read the current attempt's `outcome` so we
684    // can derive `terminal_outcome` per `exec_core::derive_terminal_outcome`.
685    let ec_row = sqlx::query(
686        r#"
687        SELECT lifecycle_phase, flow_id, attempt_index, priority, raw_fields
688          FROM ff_exec_core
689         WHERE partition_key = $1 AND execution_id = $2
690         FOR NO KEY UPDATE
691        "#,
692    )
693    .bind(partition_key)
694    .bind(exec_uuid)
695    .fetch_optional(&mut *tx)
696    .await
697    .map_err(map_sqlx_error)?;
698
699    let Some(ec_row) = ec_row else {
700        tx.rollback().await.map_err(map_sqlx_error)?;
701        return Err(EngineError::NotFound {
702            entity: "execution",
703        });
704    };
705
706    let lifecycle_phase: String = ec_row
707        .try_get("lifecycle_phase")
708        .map_err(map_sqlx_error)?;
709    let flow_id: Option<Uuid> = ec_row.try_get("flow_id").map_err(map_sqlx_error)?;
710    let attempt_index: i32 = ec_row.try_get("attempt_index").map_err(map_sqlx_error)?;
711
712    // Valkey-canonical gate: `lifecycle_phase = 'terminal'`
713    // (`flowfabric.lua:8535-8537` → `err("execution_not_terminal")`).
714    if lifecycle_phase != "terminal" {
715        tx.rollback().await.map_err(map_sqlx_error)?;
716        return Err(EngineError::State(StateKind::ExecutionNotTerminal));
717    }
718
719    // Read the current attempt's outcome for the skipped-flow-member
720    // branch check.
721    let att_row = sqlx::query(
722        r#"
723        SELECT outcome
724          FROM ff_attempt
725         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
726         FOR UPDATE
727        "#,
728    )
729    .bind(partition_key)
730    .bind(exec_uuid)
731    .bind(attempt_index)
732    .fetch_optional(&mut *tx)
733    .await
734    .map_err(map_sqlx_error)?;
735
736    let attempt_outcome: Option<String> = match att_row.as_ref() {
737        Some(r) => r.try_get("outcome").map_err(map_sqlx_error)?,
738        None => None,
739    };
740
741    // Branch selector matches Valkey (`flowfabric.lua:8555`).
742    // Postgres `terminal_outcome` is derived, not stored; mirror
743    // `exec_core::derive_terminal_outcome` at the SQL level.
744    let is_skipped_flow_member =
745        attempt_outcome.as_deref() == Some("skipped") && flow_id.is_some();
746
747    // Skipped-flow-member branch (Rev 7 Fork 1 Option A).
748    // Reset downstream edge-group counters: skip/fail/running → 0,
749    // success preserved. Valkey ground-truth:
750    // `flowfabric.lua:8580` comment "satisfied edges remain satisfied".
751    let groups_reset: i64 = if is_skipped_flow_member {
752        let count = sqlx::query(
753            r#"
754            UPDATE ff_edge_group
755               SET skip_count    = 0,
756                   fail_count    = 0,
757                   running_count = 0
758             WHERE (partition_key, flow_id, downstream_eid) IN (
759               SELECT DISTINCT e.partition_key, e.flow_id, e.downstream_eid
760                 FROM ff_edge e
761                WHERE e.partition_key   = $1
762                  AND e.downstream_eid = $2
763             )
764            "#,
765        )
766        .bind(partition_key)
767        .bind(exec_uuid)
768        .execute(&mut *tx)
769        .await
770        .map_err(map_sqlx_error)?
771        .rows_affected();
772        count as i64
773    } else {
774        0
775    };
776
777    // Both branches: in-place mutate `ff_exec_core` to
778    // `lifecycle_phase='runnable'` — matches Valkey's base+skip
779    // both writing `runnable` (flowfabric.lua:8591 + 8625).
780    // `attempt_index` NOT bumped (Rev 7 Fork 2 Option A).
781    //
782    // Secondary state differs per branch per §4.2.5:
783    //  - normal:  eligibility_state='eligible_now', public_state='waiting'
784    //  - skipped: eligibility_state='blocked_by_dependencies',
785    //             public_state='waiting_children'
786    //
787    // raw_fields.replay_count bumped (+1). Valkey bumps
788    // exec_core.replay_count; Postgres stores it in raw_fields.
789    let (eligibility_state, public_state) = if is_skipped_flow_member {
790        ("blocked_by_dependencies", "waiting_children")
791    } else {
792        ("eligible_now", "waiting")
793    };
794
795    sqlx::query(
796        r#"
797        UPDATE ff_exec_core
798           SET lifecycle_phase      = 'runnable',
799               ownership_state      = 'unowned',
800               eligibility_state    = $3,
801               public_state         = $4,
802               attempt_state        = 'pending_replay_attempt',
803               terminal_at_ms       = NULL,
804               result               = NULL,
805               cancellation_reason  = NULL,
806               cancelled_by         = NULL,
807               raw_fields           = jsonb_set(
808                 jsonb_set(raw_fields, '{last_mutation_at}', to_jsonb($5::text)),
809                 '{replay_count}',
810                 to_jsonb(COALESCE((raw_fields->>'replay_count')::int, 0) + 1)
811               )
812         WHERE partition_key = $1 AND execution_id = $2
813        "#,
814    )
815    .bind(partition_key)
816    .bind(exec_uuid)
817    .bind(eligibility_state)
818    .bind(public_state)
819    .bind(now)
820    .execute(&mut *tx)
821    .await
822    .map_err(map_sqlx_error)?;
823
824    // In-place mutate the current `ff_attempt` row (Rev 7 Fork 2
825    // Option A). Reset outcome + lease state; bump lease_epoch to
826    // fence any in-flight RMW. No new attempt row inserted; no
827    // historical row retained. Only columns that exist on the real
828    // schema are touched (`ff_attempt` has no `lease_id` / `result` /
829    // `error_code` columns — see migrations/0001_initial.sql:160-176).
830    if att_row.is_some() {
831        sqlx::query(
832            r#"
833            UPDATE ff_attempt
834               SET outcome              = NULL,
835                   terminal_at_ms       = NULL,
836                   worker_id            = NULL,
837                   worker_instance_id   = NULL,
838                   lease_expires_at_ms  = NULL,
839                   lease_epoch          = lease_epoch + 1
840             WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
841            "#,
842        )
843        .bind(partition_key)
844        .bind(exec_uuid)
845        .bind(attempt_index)
846        .execute(&mut *tx)
847        .await
848        .map_err(map_sqlx_error)?;
849    }
850
851    // Outbox emit (§4.2.7 — ff_operator_event event_type='replayed').
852    let details = if is_skipped_flow_member {
853        json!({
854            "branch": "skipped_flow_member",
855            "groups_reset": groups_reset,
856        })
857    } else {
858        json!({
859            "branch": "normal",
860        })
861    };
862    operator_event::emit(
863        &mut tx,
864        partition_key,
865        exec_uuid,
866        operator_event::EVENT_REPLAYED,
867        details,
868        now,
869    )
870    .await?;
871
872    tx.commit().await.map_err(map_sqlx_error)?;
873
874    let ps = if is_skipped_flow_member {
875        PublicState::WaitingChildren
876    } else {
877        PublicState::Waiting
878    };
879    Ok(ReplayExecutionResult::Replayed { public_state: ps })
880}
881
882pub(super) async fn replay_execution_impl(
883    pool: &PgPool,
884    args: ReplayExecutionArgs,
885) -> Result<ReplayExecutionResult, EngineError> {
886    retry_serializable(|| replay_execution_once(pool, &args)).await
887}
888
889// ─── cancel_flow_header (§4.2.3) ───────────────────────────────────
890
891/// Format a member execution UUID as the wire-form `ExecutionId`
892/// string (`{fp:N}:<uuid>`) expected by downstream consumers.
893fn member_wire_id(partition_key: i16, exec_uuid: Uuid) -> String {
894    format!("{{fp:{partition_key}}}:{exec_uuid}")
895}
896
897async fn cancel_flow_header_once(
898    pool: &PgPool,
899    partition_config: &ff_core::partition::PartitionConfig,
900    args: &CancelFlowArgs,
901) -> Result<CancelFlowHeader, EngineError> {
902    let flow_uuid: Uuid = args.flow_id.0;
903    // Flow + members share `partition_key` via the flow's partition
904    // hash (RFC-011 co-location). Use the backend's configured
905    // `partition_config` so non-default `num_flow_partitions`
906    // deployments route correctly.
907    let partition_key: i16 =
908        ff_core::partition::flow_partition(&args.flow_id, partition_config).index as i16;
909    let now: i64 = args.now.0;
910
911    let mut tx = begin_serializable(pool).await?;
912
913    // Pre-read the flow row under lock. Idempotent-replay trigger is
914    // `public_flow_state` — flows already in a terminal state surface
915    // `AlreadyTerminal` with stored policy/reason from `raw_fields` +
916    // either the prior backlog-member enumeration (if we landed the
917    // first flip) or live `ff_exec_core` membership (pre-E2-shape
918    // flow cancelled by the legacy `flow::cancel_flow` path).
919    let flow_row = sqlx::query(
920        r#"
921        SELECT public_flow_state, raw_fields
922          FROM ff_flow_core
923         WHERE partition_key = $1 AND flow_id = $2
924         FOR NO KEY UPDATE
925        "#,
926    )
927    .bind(partition_key)
928    .bind(flow_uuid)
929    .fetch_optional(&mut *tx)
930    .await
931    .map_err(map_sqlx_error)?;
932
933    let Some(flow_row) = flow_row else {
934        tx.rollback().await.map_err(map_sqlx_error)?;
935        return Err(EngineError::NotFound { entity: "flow" });
936    };
937
938    let public_flow_state: String = flow_row
939        .try_get("public_flow_state")
940        .map_err(map_sqlx_error)?;
941    let raw_fields: serde_json::Value = flow_row.try_get("raw_fields").map_err(map_sqlx_error)?;
942
943    // Idempotent-replay path — flow already in a terminal state. Read
944    // stored policy / reason (from raw_fields — `flow::cancel_flow`
945    // writes `cancellation_policy` there) + the backlog-member rows
946    // (our prior enumeration).
947    if matches!(public_flow_state.as_str(), "cancelled" | "completed" | "failed") {
948        let stored_cancellation_policy = raw_fields
949            .get("cancellation_policy")
950            .and_then(|v| v.as_str())
951            .map(str::to_owned);
952        let stored_cancel_reason = raw_fields
953            .get("cancel_reason")
954            .and_then(|v| v.as_str())
955            .map(str::to_owned);
956
957        // Return the enumerated members from the backlog if we have
958        // one; otherwise fall back to live `ff_exec_core` membership
959        // (matches Valkey's SMEMBERS pattern on flow_members_set).
960        let member_rows = sqlx::query(
961            r#"
962            SELECT execution_id
963              FROM ff_cancel_backlog_member
964             WHERE partition_key = $1 AND flow_id = $2
965            "#,
966        )
967        .bind(partition_key)
968        .bind(flow_uuid)
969        .fetch_all(&mut *tx)
970        .await
971        .map_err(map_sqlx_error)?;
972
973        let members: Vec<String> = if member_rows.is_empty() {
974            // Pre-E2-shape flow: enumerate live members from exec_core.
975            let live = sqlx::query(
976                r#"
977                SELECT execution_id
978                  FROM ff_exec_core
979                 WHERE partition_key = $1 AND flow_id = $2
980                "#,
981            )
982            .bind(partition_key)
983            .bind(flow_uuid)
984            .fetch_all(&mut *tx)
985            .await
986            .map_err(map_sqlx_error)?;
987            live.iter()
988                .map(|r| {
989                    let u: Uuid = r.get("execution_id");
990                    member_wire_id(partition_key, u)
991                })
992                .collect()
993        } else {
994            member_rows
995                .iter()
996                .map(|r| r.get::<String, _>("execution_id"))
997                .collect()
998        };
999
1000        tx.commit().await.map_err(map_sqlx_error)?;
1001        return Ok(CancelFlowHeader::AlreadyTerminal {
1002            stored_cancellation_policy,
1003            stored_cancel_reason,
1004            member_execution_ids: members,
1005        });
1006    }
1007
1008    // Fresh cancel — flip flow_core + insert backlog header + enumerate
1009    // + bulk-insert backlog members + flip exec_core lifecycle_phase
1010    // per member (matches `flow::cancel_flow_once` pattern).
1011
1012    sqlx::query(
1013        r#"
1014        UPDATE ff_flow_core
1015           SET public_flow_state = 'cancelled',
1016               terminal_at_ms    = COALESCE(terminal_at_ms, $3),
1017               raw_fields        = raw_fields
1018                                    || jsonb_build_object(
1019                                         'cancellation_policy', $4::text,
1020                                         'cancel_reason',       $5::text)
1021         WHERE partition_key = $1 AND flow_id = $2
1022        "#,
1023    )
1024    .bind(partition_key)
1025    .bind(flow_uuid)
1026    .bind(now)
1027    .bind(&args.cancellation_policy)
1028    .bind(&args.reason)
1029    .execute(&mut *tx)
1030    .await
1031    .map_err(map_sqlx_error)?;
1032
1033    sqlx::query(
1034        r#"
1035        INSERT INTO ff_cancel_backlog
1036            (partition_key, flow_id, requested_at_ms, requester, reason,
1037             cancellation_policy, status)
1038        VALUES ($1, $2, $3, '', $4, $5, 'pending')
1039        ON CONFLICT (partition_key, flow_id) DO NOTHING
1040        "#,
1041    )
1042    .bind(partition_key)
1043    .bind(flow_uuid)
1044    .bind(now)
1045    .bind(&args.reason)
1046    .bind(&args.cancellation_policy)
1047    .execute(&mut *tx)
1048    .await
1049    .map_err(map_sqlx_error)?;
1050
1051    // Enumerate in-flight members.
1052    let member_rows = sqlx::query(
1053        r#"
1054        SELECT execution_id
1055          FROM ff_exec_core
1056         WHERE partition_key = $1 AND flow_id = $2
1057           AND lifecycle_phase NOT IN ('terminal','cancelled')
1058        "#,
1059    )
1060    .bind(partition_key)
1061    .bind(flow_uuid)
1062    .fetch_all(&mut *tx)
1063    .await
1064    .map_err(map_sqlx_error)?;
1065
1066    let member_uuids: Vec<Uuid> = member_rows.iter().map(|r| r.get("execution_id")).collect();
1067    let member_execution_ids: Vec<String> = member_uuids
1068        .iter()
1069        .map(|u| member_wire_id(partition_key, *u))
1070        .collect();
1071
1072    if !member_uuids.is_empty() {
1073        // Bulk INSERT backlog members via UNNEST — one round-trip
1074        // regardless of membership cardinality (vs. N round-trips in
1075        // the prior per-member loop).
1076        sqlx::query(
1077            r#"
1078            INSERT INTO ff_cancel_backlog_member
1079                (partition_key, flow_id, execution_id)
1080            SELECT $1, $2, eid
1081              FROM UNNEST($3::text[]) AS eid
1082            ON CONFLICT (partition_key, flow_id, execution_id) DO NOTHING
1083            "#,
1084        )
1085        .bind(partition_key)
1086        .bind(flow_uuid)
1087        .bind(&member_execution_ids)
1088        .execute(&mut *tx)
1089        .await
1090        .map_err(map_sqlx_error)?;
1091
1092        // Bulk UPDATE each member's `ff_exec_core` lifecycle (same
1093        // shape as `flow::cancel_flow_once`). The Server's own
1094        // wait/async machinery drives per-member cancel events
1095        // downstream; the backlog rows live until `ack_cancel_member`
1096        // drains them.
1097        sqlx::query(
1098            r#"
1099            UPDATE ff_exec_core
1100               SET lifecycle_phase     = 'cancelled',
1101                   eligibility_state   = 'cancelled',
1102                   public_state        = 'cancelled',
1103                   terminal_at_ms      = COALESCE(terminal_at_ms, $3),
1104                   cancellation_reason = COALESCE(cancellation_reason, $4),
1105                   cancelled_by        = COALESCE(cancelled_by, 'cancel_flow_header')
1106             WHERE partition_key = $1 AND execution_id = ANY($2::uuid[])
1107            "#,
1108        )
1109        .bind(partition_key)
1110        .bind(&member_uuids)
1111        .bind(now)
1112        .bind(&args.reason)
1113        .execute(&mut *tx)
1114        .await
1115        .map_err(map_sqlx_error)?;
1116    }
1117
1118    // Outbox emit (§4.2.7 — flow-level `flow_cancel_requested`).
1119    operator_event::emit(
1120        &mut tx,
1121        partition_key,
1122        flow_uuid,
1123        operator_event::EVENT_FLOW_CANCEL_REQUESTED,
1124        json!({
1125            "flow_id": flow_uuid.to_string(),
1126            "cancellation_policy": &args.cancellation_policy,
1127            "reason": &args.reason,
1128            "member_count": member_execution_ids.len(),
1129        }),
1130        now,
1131    )
1132    .await?;
1133
1134    tx.commit().await.map_err(map_sqlx_error)?;
1135
1136    Ok(CancelFlowHeader::Cancelled {
1137        cancellation_policy: args.cancellation_policy.clone(),
1138        member_execution_ids,
1139    })
1140}
1141
1142pub(super) async fn cancel_flow_header_impl(
1143    pool: &PgPool,
1144    partition_config: &ff_core::partition::PartitionConfig,
1145    args: CancelFlowArgs,
1146) -> Result<CancelFlowHeader, EngineError> {
1147    retry_serializable(|| cancel_flow_header_once(pool, partition_config, &args)).await
1148}
1149
1150// ─── ack_cancel_member (§4.2.3) ────────────────────────────────────
1151
1152async fn ack_cancel_member_once(
1153    pool: &PgPool,
1154    partition_config: &ff_core::partition::PartitionConfig,
1155    flow_id: &FlowId,
1156    execution_id: &ExecutionId,
1157) -> Result<(), EngineError> {
1158    let flow_uuid: Uuid = flow_id.0;
1159    let partition_key: i16 =
1160        ff_core::partition::flow_partition(flow_id, partition_config).index as i16;
1161    let member_wire = execution_id.as_str();
1162
1163    let mut tx = begin_serializable(pool).await?;
1164
1165    // §4.2.3 — member-drain + conditional parent-DELETE. Under
1166    // SERIALIZABLE, concurrent ack × ack race: both TXs read identical
1167    // snapshots of `ff_cancel_backlog_member`; the losing TX gets
1168    // 40001 at COMMIT and is retried by `retry_serializable` — the
1169    // retry observes the winner's state and the member-DELETE becomes
1170    // a no-op (0 rows), the parent-DELETE predicate re-evaluates
1171    // against post-winner state.
1172    //
1173    // Implementation note: Postgres data-modifying CTEs share a
1174    // snapshot across all statements in the WITH clause — a CTE-form
1175    // `WITH deleted_member AS (DELETE ...) DELETE FROM parent WHERE
1176    // NOT EXISTS (...)` leaves the parent behind on the last-member
1177    // drain because the outer DELETE's `NOT EXISTS` still sees the
1178    // about-to-be-deleted row in the snapshot. Two statements in the
1179    // same tx observe the prior statement's writes — SERIALIZABLE
1180    // isolation still holds against concurrent acks.
1181    //
1182    // NO outbox emit (§4.2.7 — Valkey-parity quiet).
1183    sqlx::query(
1184        r#"
1185        DELETE FROM ff_cancel_backlog_member
1186         WHERE partition_key = $1
1187           AND flow_id       = $2
1188           AND execution_id  = $3
1189        "#,
1190    )
1191    .bind(partition_key)
1192    .bind(flow_uuid)
1193    .bind(member_wire)
1194    .execute(&mut *tx)
1195    .await
1196    .map_err(map_sqlx_error)?;
1197
1198    sqlx::query(
1199        r#"
1200        DELETE FROM ff_cancel_backlog
1201         WHERE partition_key = $1
1202           AND flow_id       = $2
1203           AND NOT EXISTS (
1204             SELECT 1 FROM ff_cancel_backlog_member
1205              WHERE partition_key = $1 AND flow_id = $2
1206           )
1207        "#,
1208    )
1209    .bind(partition_key)
1210    .bind(flow_uuid)
1211    .execute(&mut *tx)
1212    .await
1213    .map_err(map_sqlx_error)?;
1214
1215    tx.commit().await.map_err(map_sqlx_error)?;
1216    Ok(())
1217}
1218
1219pub(super) async fn ack_cancel_member_impl(
1220    pool: &PgPool,
1221    partition_config: &ff_core::partition::PartitionConfig,
1222    flow_id: FlowId,
1223    execution_id: ExecutionId,
1224) -> Result<(), EngineError> {
1225    retry_serializable(|| {
1226        ack_cancel_member_once(pool, partition_config, &flow_id, &execution_id)
1227    })
1228    .await
1229}