Skip to main content

ff_backend_postgres/
exec_core.rs

1//! exec_core trait-method family — Postgres impls.
2//!
3//! **RFC-v0.7 Wave 4 (Agent A).** Implements the five exec-core write
4//! + read methods for the Postgres backend:
5//!
6//!  1. [`create_execution_impl`] — inherent entry on
7//!     [`super::PostgresBackend`]. Replaces the Valkey `ff_create_execution`
8//!     FCALL with a single `INSERT ... ON CONFLICT DO NOTHING` for
9//!     idempotency (idempotency-key replay mirrors the FCALL's
10//!     `Duplicate` outcome). Also seeds the global [`ff_lane_registry`]
11//!     row on first sight of a lane (Q6 adjudication — dynamic lanes
12//!     get a registry entry here as well as at server-boot seeding).
13//!  2. [`describe_execution_impl`] — single-row `SELECT` against
14//!     `ff_exec_core`, decoded into [`ExecutionSnapshot`] via the
15//!     shared [`ff_core::contracts::decode::build_execution_snapshot`]
16//!     helper so the snapshot shape matches the Valkey backend
17//!     bit-for-bit.
18//!  3. [`list_executions_impl`] — cursor-paginated forward scan over
19//!     one `partition_key`, `ORDER BY execution_id ASC`. Uses the
20//!     N+1 trick (fetch `limit+1` rows) to decide `next_cursor`.
21//!  4. [`cancel_impl`] — transactional `SELECT ... FOR UPDATE` +
22//!     `UPDATE` to transition the exec row to
23//!     `public_state='cancelled'`, `lifecycle_phase='terminal'`,
24//!     setting `terminal_at_ms` + `cancellation_reason`. Per Q11 the
25//!     default READ COMMITTED isolation suffices; the row lock
26//!     narrows the RMW to one writer.
27//!  5. [`resolve_execution_flow_id_impl`] — one-column lookup by
28//!     `execution_id` (the unique index lets pg skip partition
29//!     pruning for this admin-tooling path).
30//!
31//! Spec authority: `rfcs/drafts/v0.7-migration-master.md` Q5 (partition
32//! math), Q11 (isolation), Q14 (dual-backend — no cross-backend
33//! fields).
34
35use std::collections::HashMap;
36use std::time::{SystemTime, UNIX_EPOCH};
37
38use ff_core::contracts::decode::build_execution_snapshot;
39use ff_core::contracts::{
40    CreateExecutionArgs, ExecutionContext, ExecutionInfo, ExecutionSnapshot, ListExecutionsPage,
41};
42use ff_core::engine_error::{EngineError, ValidationKind};
43use ff_core::partition::{PartitionConfig, PartitionKey};
44use ff_core::state::{
45    AttemptState, BlockingReason, EligibilityState, LifecyclePhase, OwnershipState, PublicState,
46    StateVector, TerminalOutcome,
47};
48use ff_core::types::{ExecutionId, FlowId};
49use serde_json::Value as JsonValue;
50use sqlx::{PgPool, Row};
51use uuid::Uuid;
52
53use crate::error::map_sqlx_error;
54
55/// Extract the raw UUID suffix from an [`ExecutionId`]'s wire form
56/// (`"{fp:N}:<uuid>"`). The constructors / [`ExecutionId::parse`]
57/// guarantee the `}:` separator exists and the suffix is a valid
58/// UUID.
59pub(crate) fn eid_uuid(eid: &ExecutionId) -> Uuid {
60    let s = eid.as_str();
61    // Shape is enforced by ExecutionId constructors: `{fp:N}:<uuid>`.
62    let suffix = s
63        .split_once("}:")
64        .map(|(_, u)| u)
65        .expect("ExecutionId has `}:` separator (invariant)");
66    Uuid::parse_str(suffix).expect("ExecutionId suffix is a valid UUID (invariant)")
67}
68
69/// Build a typed [`FlowId`] / [`ExecutionId`] string back from a
70/// partition index + UUID. Keeps the `{fp:N}:<uuid>` wire shape the
71/// rest of FF expects.
72fn eid_from_parts(partition: u16, uuid: Uuid) -> Result<ExecutionId, EngineError> {
73    let s = format!("{{fp:{partition}}}:{uuid}");
74    ExecutionId::parse(&s).map_err(|e| EngineError::Validation {
75        kind: ValidationKind::Corruption,
76        detail: format!("exec_core: execution_id: could not reassemble '{s}': {e}"),
77    })
78}
79
80fn now_ms() -> i64 {
81    let d = SystemTime::now()
82        .duration_since(UNIX_EPOCH)
83        .expect("clock is after UNIX_EPOCH");
84    (d.as_millis() as i64).max(0)
85}
86
87// ─── create_execution ─────────────────────────────────────────────
88
89/// Insert one `ff_exec_core` row (idempotent on primary key) + seed
90/// the lane registry.
91///
92/// Idempotent replay: on primary-key conflict we treat the call as a
93/// successful duplicate and return `Ok(args.execution_id)` — matching
94/// `CreateExecutionResult::Duplicate` from the Valkey FCALL path.
95pub(super) async fn create_execution_impl(
96    pool: &PgPool,
97    _partition_config: &PartitionConfig,
98    args: CreateExecutionArgs,
99) -> Result<ExecutionId, EngineError> {
100    let partition_key: i16 = args.execution_id.partition() as i16;
101    let execution_id = eid_uuid(&args.execution_id);
102    let lane_id = args.lane_id.as_str().to_owned();
103    let priority: i32 = args.priority;
104    let created_at_ms: i64 = args.now.0;
105    let deadline_at_ms: Option<i64> = args.execution_deadline_at.map(|t| t.0);
106
107    // `raw_fields` carries every CreateExecution arg that doesn't map
108    // to a typed column today. Describe_execution reads them back via
109    // `build_execution_snapshot`, which speaks HashMap<String,String>
110    // — so we store strings here.
111    let mut raw: serde_json::Map<String, JsonValue> = serde_json::Map::new();
112    raw.insert(
113        "namespace".into(),
114        JsonValue::String(args.namespace.as_str().to_owned()),
115    );
116    raw.insert("execution_kind".into(), JsonValue::String(args.execution_kind));
117    raw.insert(
118        "creator_identity".into(),
119        JsonValue::String(args.creator_identity),
120    );
121    if let Some(k) = args.idempotency_key {
122        raw.insert("idempotency_key".into(), JsonValue::String(k));
123    }
124    if let Some(enc) = args.payload_encoding {
125        raw.insert("payload_encoding".into(), JsonValue::String(enc));
126    }
127    // last_mutation_at mirrors Valkey's exec_core field — initialised
128    // to created_at on first write.
129    raw.insert(
130        "last_mutation_at".into(),
131        JsonValue::String(created_at_ms.to_string()),
132    );
133    raw.insert(
134        "total_attempt_count".into(),
135        JsonValue::String("0".to_owned()),
136    );
137    // Tags live under raw_fields.tags as a JSON object keyed by tag name.
138    let tags_json: serde_json::Map<String, JsonValue> = args
139        .tags
140        .into_iter()
141        .map(|(k, v)| (k, JsonValue::String(v)))
142        .collect();
143    raw.insert("tags".into(), JsonValue::Object(tags_json));
144
145    let raw_fields = JsonValue::Object(raw);
146    let policy_json: Option<JsonValue> = match args.policy {
147        Some(p) => Some(serde_json::to_value(&p).map_err(|e| EngineError::Validation {
148            kind: ValidationKind::InvalidInput,
149            detail: format!("create_execution: policy: serialize failed: {e}"),
150        })?),
151        None => None,
152    };
153
154    // Create exec row + seed lane registry in one transaction so a
155    // concurrent lane-list doesn't see an exec without the lane
156    // registry row.
157    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
158
159    sqlx::query(
160        r#"
161        INSERT INTO ff_exec_core (
162            partition_key, execution_id, flow_id, lane_id,
163            required_capabilities, attempt_index,
164            lifecycle_phase, ownership_state, eligibility_state,
165            public_state, attempt_state,
166            priority, created_at_ms, deadline_at_ms,
167            payload, policy, raw_fields
168        ) VALUES (
169            $1, $2, NULL, $3,
170            '{}'::text[], 0,
171            'submitted', 'unowned', 'eligible_now',
172            'waiting', 'pending',
173            $4, $5, $6,
174            $7, $8, $9
175        )
176        ON CONFLICT (partition_key, execution_id) DO NOTHING
177        "#,
178    )
179    .bind(partition_key)
180    .bind(execution_id)
181    .bind(&lane_id)
182    .bind(priority)
183    .bind(created_at_ms)
184    .bind(deadline_at_ms)
185    .bind(&args.input_payload)
186    .bind(policy_json)
187    .bind(&raw_fields)
188    .execute(&mut *tx)
189    .await
190    .map_err(map_sqlx_error)?;
191
192    // Lane registry: Q6 — dynamic lanes seed here on first use.
193    sqlx::query(
194        r#"
195        INSERT INTO ff_lane_registry (lane_id, registered_at_ms, registered_by)
196        VALUES ($1, $2, $3)
197        ON CONFLICT (lane_id) DO NOTHING
198        "#,
199    )
200    .bind(&lane_id)
201    .bind(created_at_ms)
202    .bind("create_execution")
203    .execute(&mut *tx)
204    .await
205    .map_err(map_sqlx_error)?;
206
207    tx.commit().await.map_err(map_sqlx_error)?;
208
209    Ok(args.execution_id)
210}
211
212// ─── describe_execution ──────────────────────────────────────────
213
214pub(super) async fn describe_execution_impl(
215    pool: &PgPool,
216    _partition_config: &PartitionConfig,
217    id: &ExecutionId,
218) -> Result<Option<ExecutionSnapshot>, EngineError> {
219    let partition_key: i16 = id.partition() as i16;
220    let execution_id = eid_uuid(id);
221
222    let row = sqlx::query(
223        r#"
224        SELECT flow_id, lane_id, public_state, blocking_reason,
225               created_at_ms, raw_fields
226        FROM ff_exec_core
227        WHERE partition_key = $1 AND execution_id = $2
228        "#,
229    )
230    .bind(partition_key)
231    .bind(execution_id)
232    .fetch_optional(pool)
233    .await
234    .map_err(map_sqlx_error)?;
235
236    let Some(row) = row else {
237        return Ok(None);
238    };
239
240    let flow_id_uuid: Option<Uuid> = row.try_get("flow_id").map_err(map_sqlx_error)?;
241    let lane_id: String = row.try_get("lane_id").map_err(map_sqlx_error)?;
242    let public_state: String = row.try_get("public_state").map_err(map_sqlx_error)?;
243    let blocking_reason: Option<String> =
244        row.try_get("blocking_reason").map_err(map_sqlx_error)?;
245    let created_at_ms: i64 = row.try_get("created_at_ms").map_err(map_sqlx_error)?;
246    let raw_fields: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
247
248    // Build the HashMap<String,String> shape the shared decoder
249    // consumes. The Postgres row projection + JSON raw_fields together
250    // carry every field `build_execution_snapshot` reads.
251    let mut core: HashMap<String, String> = HashMap::new();
252    core.insert("public_state".into(), public_state);
253    core.insert("lane_id".into(), lane_id);
254    if let Some(fid) = flow_id_uuid {
255        // Reassemble `{fp:N}:<uuid>` using the exec's own partition
256        // (RFC-011 co-location: exec + flow share a partition).
257        core.insert(
258            "flow_id".into(),
259            format!("{{fp:{part}}}:{fid}", part = id.partition()),
260        );
261    }
262    if let Some(r) = blocking_reason {
263        core.insert("blocking_reason".into(), r);
264    }
265    core.insert("created_at".into(), created_at_ms.to_string());
266
267    // raw_fields-derived scalars. build_execution_snapshot hard-requires
268    // `last_mutation_at`; `create_execution_impl` seeds it to
269    // `created_at_ms`, subsequent mutators (future waves) bump it.
270    if let JsonValue::Object(map) = &raw_fields {
271        for key in [
272            "namespace",
273            "last_mutation_at",
274            "total_attempt_count",
275            "current_attempt_id",
276            "current_attempt_index",
277            "current_waitpoint_id",
278            "blocking_detail",
279        ] {
280            if let Some(JsonValue::String(s)) = map.get(key) {
281                core.insert(key.to_owned(), s.clone());
282            }
283        }
284    }
285
286    // Tags hash: extract from raw_fields.tags JSON object.
287    let tags_raw: HashMap<String, String> = match &raw_fields {
288        JsonValue::Object(map) => match map.get("tags") {
289            Some(JsonValue::Object(tag_map)) => tag_map
290                .iter()
291                .filter_map(|(k, v)| {
292                    v.as_str().map(|s| (k.clone(), s.to_owned()))
293                })
294                .collect(),
295            _ => HashMap::new(),
296        },
297        _ => HashMap::new(),
298    };
299
300    build_execution_snapshot(id.clone(), &core, tags_raw)
301}
302
303// ─── read_execution_context ──────────────────────────────────────
304
305/// Point-read of `(input_payload, execution_kind, tags)` from
306/// `ff_exec_core`. Used by the SDK worker to populate a freshly
307/// claimed [`ClaimedTask`](ff_sdk::ClaimedTask). Returns
308/// [`EngineError::Validation { kind: InvalidInput, .. }`](ff_core::engine_error::EngineError::Validation)
309/// when the execution does not exist (the SDK only calls this
310/// post-claim, so a missing row is an invariant violation).
311///
312/// Single `SELECT payload, raw_fields` on `(partition_key, execution_id)`;
313/// `execution_kind` + `tags` are projected out of `raw_fields`. The
314/// payload lives in its own `BYTEA` column so we don't round-trip the
315/// bytes through JSON encoding.
316pub(super) async fn read_execution_context_impl(
317    pool: &PgPool,
318    _partition_config: &PartitionConfig,
319    id: &ExecutionId,
320) -> Result<ExecutionContext, EngineError> {
321    let partition_key: i16 = id.partition() as i16;
322    let execution_id = eid_uuid(id);
323
324    let row = sqlx::query(
325        r#"
326        SELECT payload, raw_fields
327        FROM ff_exec_core
328        WHERE partition_key = $1 AND execution_id = $2
329        "#,
330    )
331    .bind(partition_key)
332    .bind(execution_id)
333    .fetch_optional(pool)
334    .await
335    .map_err(map_sqlx_error)?;
336
337    let Some(row) = row else {
338        return Err(EngineError::Validation {
339            kind: ValidationKind::InvalidInput,
340            detail: format!("read_execution_context: execution not found: {id}"),
341        });
342    };
343
344    let payload: Option<Vec<u8>> = row.try_get("payload").map_err(map_sqlx_error)?;
345    let raw_fields: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
346
347    let input_payload = payload.unwrap_or_default();
348
349    let (execution_kind, tags) = match &raw_fields {
350        JsonValue::Object(map) => {
351            let kind = map
352                .get("execution_kind")
353                .and_then(|v| v.as_str())
354                .map(|s| s.to_owned())
355                .unwrap_or_default();
356            let tags: HashMap<String, String> = match map.get("tags") {
357                Some(JsonValue::Object(tag_map)) => tag_map
358                    .iter()
359                    .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_owned())))
360                    .collect(),
361                _ => HashMap::new(),
362            };
363            (kind, tags)
364        }
365        _ => (String::new(), HashMap::new()),
366    };
367
368    Ok(ExecutionContext::new(input_payload, execution_kind, tags))
369}
370
371// ─── read_current_attempt_index ──────────────────────────────────
372
373/// Point-read of `attempt_index` from `ff_exec_core`. Used by the SDK
374/// worker on the `claim_from_resume_grant` path before dispatching
375/// `claim_resumed_execution`. Missing row → `InvalidInput` (same
376/// convention as [`read_execution_context_impl`]).
377pub(super) async fn read_current_attempt_index_impl(
378    pool: &PgPool,
379    _partition_config: &PartitionConfig,
380    id: &ExecutionId,
381) -> Result<ff_core::types::AttemptIndex, EngineError> {
382    let partition_key: i16 = id.partition() as i16;
383    let execution_id = eid_uuid(id);
384
385    let row = sqlx::query(
386        r#"
387        SELECT attempt_index
388        FROM ff_exec_core
389        WHERE partition_key = $1 AND execution_id = $2
390        "#,
391    )
392    .bind(partition_key)
393    .bind(execution_id)
394    .fetch_optional(pool)
395    .await
396    .map_err(map_sqlx_error)?;
397
398    let Some(row) = row else {
399        return Err(EngineError::Validation {
400            kind: ValidationKind::InvalidInput,
401            detail: format!(
402                "read_current_attempt_index: execution not found: {id}"
403            ),
404        });
405    };
406    let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
407    // `.max(0) as u32`: column is `integer NOT NULL DEFAULT 0`, but a
408    // negative value (from a hand-edited row, say) would wrap to a very
409    // large u32 under a direct cast. Clamp to 0 — matches the PG
410    // convention used elsewhere in this file for attempt-index reads
411    // (see `attempt.rs::claim_impl`).
412    let attempt_index =
413        ff_core::types::AttemptIndex::new(attempt_index_i.max(0) as u32);
414    Ok(attempt_index)
415}
416
417// ─── read_total_attempt_count ────────────────────────────────────
418
419/// Point-read of `total_attempt_count` from `ff_exec_core.raw_fields`
420/// JSONB. Used by the SDK worker's `claim_from_grant` path to compute
421/// the next fresh attempt index (retry-path fix, v0.12 PR-5.5). The
422/// field is seeded to `"0"` by `create_execution_impl` and bumped by
423/// the same SQL transactions that advance the attempt row — it lives
424/// in the JSON bag rather than a column, matching the SQLite sibling.
425///
426/// Missing row → `InvalidInput`. Missing/non-numeric field → `0` (the
427/// FCALL surface handles the loud error case; this pre-read is
428/// best-effort).
429pub(super) async fn read_total_attempt_count_impl(
430    pool: &PgPool,
431    _partition_config: &PartitionConfig,
432    id: &ExecutionId,
433) -> Result<ff_core::types::AttemptIndex, EngineError> {
434    let partition_key: i16 = id.partition() as i16;
435    let execution_id = eid_uuid(id);
436
437    let row = sqlx::query(
438        r#"
439        SELECT raw_fields ->> 'total_attempt_count' AS total_attempt_count
440        FROM ff_exec_core
441        WHERE partition_key = $1 AND execution_id = $2
442        "#,
443    )
444    .bind(partition_key)
445    .bind(execution_id)
446    .fetch_optional(pool)
447    .await
448    .map_err(map_sqlx_error)?;
449
450    let Some(row) = row else {
451        return Err(EngineError::Validation {
452            kind: ValidationKind::InvalidInput,
453            detail: format!(
454                "read_total_attempt_count: execution not found: {id}"
455            ),
456        });
457    };
458    let raw: Option<String> = row
459        .try_get("total_attempt_count")
460        .map_err(map_sqlx_error)?;
461    let count = raw
462        .as_deref()
463        .and_then(|s| s.parse::<u32>().ok())
464        .unwrap_or(0);
465    Ok(ff_core::types::AttemptIndex::new(count))
466}
467
468// ─── list_executions ─────────────────────────────────────────────
469
470pub(super) async fn list_executions_impl(
471    pool: &PgPool,
472    _partition_config: &PartitionConfig,
473    partition: PartitionKey,
474    cursor: Option<ExecutionId>,
475    limit: usize,
476) -> Result<ListExecutionsPage, EngineError> {
477    if limit == 0 {
478        return Ok(ListExecutionsPage::new(Vec::new(), None));
479    }
480    // Parse the partition tag into a partition index (u16).
481    let parsed = partition.parse().map_err(|e| EngineError::Validation {
482        kind: ValidationKind::InvalidInput,
483        detail: format!("list_executions: partition: '{partition}': {e}"),
484    })?;
485    let partition_key: i16 = parsed.index as i16;
486    let cursor_uuid: Option<Uuid> = cursor.as_ref().map(eid_uuid);
487
488    // N+1 trick: fetch one extra row to decide `next_cursor`.
489    let effective_limit = limit.min(1000);
490    let fetch_limit: i64 = (effective_limit as i64).saturating_add(1);
491
492    let rows = sqlx::query(
493        r#"
494        SELECT execution_id
495        FROM ff_exec_core
496        WHERE partition_key = $1
497          AND ($2::uuid IS NULL OR execution_id > $2)
498        ORDER BY execution_id ASC
499        LIMIT $3
500        "#,
501    )
502    .bind(partition_key)
503    .bind(cursor_uuid)
504    .bind(fetch_limit)
505    .fetch_all(pool)
506    .await
507    .map_err(map_sqlx_error)?;
508
509    let mut ids: Vec<ExecutionId> = Vec::with_capacity(rows.len());
510    for row in &rows {
511        let u: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
512        ids.push(eid_from_parts(parsed.index, u)?);
513    }
514
515    let has_more = ids.len() > effective_limit;
516    if has_more {
517        ids.truncate(effective_limit);
518    }
519    let next_cursor = if has_more { ids.last().cloned() } else { None };
520    Ok(ListExecutionsPage::new(ids, next_cursor))
521}
522
523// ─── cancel ──────────────────────────────────────────────────────
524
525/// Cancel one execution by handle (single-execution cancel; flow-wide
526/// cancel is the sibling Wave-4c agent's lane).
527///
528/// Q11: runs under READ COMMITTED with an explicit row lock via
529/// `SELECT ... FOR UPDATE` to serialise the state check + UPDATE.
530/// Already-terminal executions are a successful no-op (idempotent
531/// replay); mismatched lease surfaces as `EngineError::Contention`.
532pub(super) async fn cancel_impl(
533    pool: &PgPool,
534    _partition_config: &PartitionConfig,
535    execution_id: &ExecutionId,
536    reason: &str,
537) -> Result<(), EngineError> {
538    let partition_key: i16 = execution_id.partition() as i16;
539    let eid_uuid = eid_uuid(execution_id);
540    let now = now_ms();
541
542    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
543
544    let current: Option<(String, String)> = sqlx::query_as(
545        r#"
546        SELECT lifecycle_phase, public_state
547        FROM ff_exec_core
548        WHERE partition_key = $1 AND execution_id = $2
549        FOR UPDATE
550        "#,
551    )
552    .bind(partition_key)
553    .bind(eid_uuid)
554    .fetch_optional(&mut *tx)
555    .await
556    .map_err(map_sqlx_error)?;
557
558    let Some((lifecycle_phase, public_state)) = current else {
559        // Not-found on a cancel is an operator-visible state error —
560        // matches the Valkey FCALL's `execution_not_found` code.
561        tx.rollback().await.map_err(map_sqlx_error)?;
562        return Err(EngineError::Validation {
563            kind: ValidationKind::InvalidInput,
564            detail: format!(
565                "cancel: execution_id={execution_id}: row not found on partition_key={partition_key}"
566            ),
567        });
568    };
569
570    // Terminal-state replay is a successful no-op. Mirrors Valkey's
571    // `reconcile_terminal_replay` path.
572    if lifecycle_phase == "terminal" {
573        tx.rollback().await.map_err(map_sqlx_error)?;
574        // Idempotent success iff already cancelled; other terminal states
575        // (completed/failed/expired/skipped) surface as a state conflict
576        // so operators don't silently squash a real terminal outcome.
577        return if public_state == "cancelled" {
578            Ok(())
579        } else {
580            Err(EngineError::Validation {
581                kind: ValidationKind::InvalidInput,
582                detail: format!(
583                    "cancel: execution_id={execution_id}: already terminal in state '{public_state}'"
584                ),
585            })
586        };
587    }
588
589    sqlx::query(
590        r#"
591        UPDATE ff_exec_core
592        SET lifecycle_phase     = 'terminal',
593            ownership_state     = 'unowned',
594            eligibility_state   = 'not_applicable',
595            public_state        = 'cancelled',
596            attempt_state       = 'cancelled',
597            terminal_at_ms      = $3,
598            cancellation_reason = $4,
599            cancelled_by        = 'worker',
600            raw_fields          = jsonb_set(raw_fields, '{last_mutation_at}', to_jsonb($3::text))
601        WHERE partition_key = $1 AND execution_id = $2
602        "#,
603    )
604    .bind(partition_key)
605    .bind(eid_uuid)
606    .bind(now)
607    .bind(reason)
608    .execute(&mut *tx)
609    .await
610    .map_err(map_sqlx_error)?;
611
612    // #355: clear the current attempt's `outcome` so a later
613    // `read_execution_info` doesn't surface a stale
614    // `retry`/`interrupted` terminal-outcome on a cancelled row.
615    // Mirrors the equivalent clear on the `cancel_flow` member loop
616    // (`flow.rs`).
617    sqlx::query(
618        r#"
619        UPDATE ff_attempt
620           SET outcome = NULL
621         WHERE partition_key = $1
622           AND execution_id  = $2
623           AND attempt_index = (SELECT attempt_index FROM ff_exec_core
624                                 WHERE partition_key = $1 AND execution_id = $2)
625        "#,
626    )
627    .bind(partition_key)
628    .bind(eid_uuid)
629    .execute(&mut *tx)
630    .await
631    .map_err(map_sqlx_error)?;
632
633    tx.commit().await.map_err(map_sqlx_error)?;
634    Ok(())
635}
636
637// ─── resolve_execution_flow_id ───────────────────────────────────
638
639pub(super) async fn resolve_execution_flow_id_impl(
640    pool: &PgPool,
641    _partition_config: &PartitionConfig,
642    eid: &ExecutionId,
643) -> Result<Option<FlowId>, EngineError> {
644    let partition_key: i16 = eid.partition() as i16;
645    let execution_id = eid_uuid(eid);
646
647    let row: Option<(Option<Uuid>,)> = sqlx::query_as(
648        r#"
649        SELECT flow_id
650        FROM ff_exec_core
651        WHERE partition_key = $1 AND execution_id = $2
652        "#,
653    )
654    .bind(partition_key)
655    .bind(execution_id)
656    .fetch_optional(pool)
657    .await
658    .map_err(map_sqlx_error)?;
659
660    let Some((maybe_fid,)) = row else {
661        return Ok(None);
662    };
663    let Some(fid_uuid) = maybe_fid else {
664        return Ok(None);
665    };
666    let s = fid_uuid.to_string();
667    FlowId::parse(&s)
668        .map(Some)
669        .map_err(|e| EngineError::Validation {
670            kind: ValidationKind::Corruption,
671            detail: format!(
672                "resolve_execution_flow_id: exec_core.flow_id='{s}' is not a valid FlowId: {e}"
673            ),
674        })
675}
676
677// ─── RFC-020 Wave 9 Spine-B — read model (3 methods) ─────────────
678//
679// All three reads project from the `ff_exec_core` row for the target
680// `(partition_key, execution_id)`; `read_execution_info` additionally
681// LATERAL-joins `ff_attempt` for the current-attempt row (outcome).
682// READ COMMITTED is sufficient — all three are single-query, read-only,
683// no CAS. Per RFC §4.1 + §7.8, `get_execution_result` is
684// current-attempt semantics (matches Valkey's `GET ctx.result()`
685// primitive; `result` column on `ff_exec_core`).
686//
687// ── Column-literal alphabet: the read-boundary adapter (#354) ────
688//
689// The Postgres `ff_exec_core` state columns — `lifecycle_phase`,
690// `ownership_state`, `eligibility_state`, `public_state`,
691// `attempt_state` — encode `(phase × eligibility × terminal-outcome)`
692// in a **richer private alphabet** than the canonical `ff_core::state`
693// enums (`LifecyclePhase`, `OwnershipState`, `EligibilityState`,
694// `PublicState`, `AttemptState`). Write sites in this crate legitimately
695// produce Postgres-specific literals that are **not** members of the
696// public enums:
697//
698//   * `flow.rs:672-687` (cancel-member loop) writes `cancelled` to
699//     `lifecycle_phase`, `eligibility_state`, and `public_state` on
700//     flow-cancel.
701//   * `operator.rs:211-235` (`cancel_execution`) likewise writes
702//     `cancelled`.
703//   * `exec_core::cancel_impl` writes `cancelled` to `public_state` +
704//     `attempt_state` and the sentinel `terminal` to `lifecycle_phase`
705//     (the Handle-level single-exec cancel path).
706//   * `suspend_ops.rs:958` (resume-claim) writes bare `running` to
707//     `public_state` (canonical form is `active`).
708//   * `suspend_ops.rs:585` writes `released` to `ownership_state`
709//     (canonical form is `unowned`).
710//   * Scheduler-transitional paths write `pending_claim` to
711//     `eligibility_state` and `attempt_state`; legacy dispatch paths
712//     write `blocked` to `lifecycle_phase`.
713//
714// **This module is the documented read-boundary adapter** (Option B
715// per owner decision on #354 / RFC-020 §4.1 Revision 8). The
716// `normalise_*` helpers below collapse each column literal to the
717// closest public-enum variant before `json_enum!` deserialises the
718// string into the enum type; unknown tokens fall through the match arm
719// unchanged and `json_enum!` surfaces them as
720// `ValidationKind::Corruption` rather than silently defaulting.
721//
722// **Invariant for new code:**
723//
724//   * New read paths against these columns MUST call through the
725//     `normalise_*` helpers before constructing a public enum.
726//   * New write paths MAY introduce new column literals (the column is
727//     the authoritative audit trail of which backend phase produced the
728//     row), but MUST update the corresponding `normalise_*` arm in the
729//     same PR so the read path stays total.
730//   * Option A — migrating write paths to canonical literals only —
731//     was considered and rejected (RFC-020 §4.1 Revision 8): it would
732//     relocate the `(phase × eligibility × terminal-outcome)` encoding
733//     from SQL columns into per-write computation without reducing
734//     mapping-layer complexity, and would lose the column-level
735//     distinction between e.g. `cancelled` (terminal-by-cancel) and
736//     `terminal` (terminal-by-success/fail/expire/skip) that the
737//     `derive_terminal_outcome` helper below depends on.
738
739// Normalisation maps: `ff_exec_core` literal → closest
740// serde-snake_case enum variant. Each arm is grounded in an actual
741// write site in this crate; unknown tokens fall through so
742// `json_enum!` surfaces `Corruption` loudly.
743
744fn normalise_lifecycle_phase(raw: &str) -> &str {
745    match raw {
746        // Terminal family — `cancelled` is Postgres-specific
747        // (flow.rs:674 writes it directly), `terminal` is canonical.
748        "cancelled" | "terminal" => "terminal",
749        // Pre-runnable + runnable variants collapse to `Runnable`
750        // (the user-facing enum only distinguishes 5 phases). `blocked`
751        // is a legacy literal still referenced in dispatch.rs:352's
752        // CASE clause but no longer written by current paths.
753        "pending" | "runnable" | "eligible" | "blocked" => "runnable",
754        "active" => "active",
755        "suspended" => "suspended",
756        "submitted" => "submitted",
757        other => other,
758    }
759}
760
761fn normalise_ownership_state(raw: &str) -> &str {
762    match raw {
763        // `released` is a Postgres-specific post-suspension marker
764        // (suspend_ops.rs:585); nearest enum variant is `Unowned`.
765        "released" | "unowned" => "unowned",
766        "leased" => "leased",
767        "lease_expired_reclaimable" => "lease_expired_reclaimable",
768        "lease_revoked" => "lease_revoked",
769        other => other,
770    }
771}
772
773fn normalise_eligibility_state(raw: &str) -> &str {
774    match raw {
775        // Terminal-cancelled rows → `NotApplicable` (Valkey parity).
776        "cancelled" => "not_applicable",
777        // Scheduler ClaimGrant transitional state; nearest user-facing
778        // variant is still `EligibleNow` (the exec is about to be
779        // claimed — scheduler has picked it but the worker hasn't
780        // acknowledged yet).
781        "pending_claim" => "eligible_now",
782        other => other,
783    }
784}
785
786fn normalise_attempt_state(raw: &str) -> &str {
787    match raw {
788        // `pending` is the Postgres initial-insert literal
789        // (exec_core.rs:166); `pending_claim` is the scheduler transitional
790        // write. Both collapse to `PendingFirstAttempt`.
791        "pending" | "pending_claim" => "pending_first_attempt",
792        // Bare `running` from the suspension-resume claim path
793        // (suspend_ops.rs:958); canonical form is `running_attempt`.
794        "running" => "running_attempt",
795        // `cancelled` attempt_state (flow.rs cancel path) has no direct
796        // enum variant; nearest is `AttemptTerminal`.
797        "cancelled" => "attempt_terminal",
798        other => other,
799    }
800}
801
802/// Collapse Postgres `public_state` literals to the
803/// `PublicState` snake_case serde form.
804fn normalise_public_state(raw: &str) -> &str {
805    match raw {
806        // Postgres writes the bare `running` literal on the
807        // resume-claim path (suspend_ops.rs:958). Valkey / the
808        // `PublicState` enum spell it `active`.
809        "running" => "active",
810        other => other,
811    }
812}
813
814macro_rules! json_enum {
815    ($ty:ty, $field:expr, $raw:expr) => {{
816        let quoted = format!("\"{}\"", $raw);
817        serde_json::from_str::<$ty>(&quoted).map_err(|e| EngineError::Validation {
818            kind: ValidationKind::Corruption,
819            detail: format!(
820                "exec_core: {}: '{}' is not a known value: {}",
821                $field, $raw, e
822            ),
823        })
824    }};
825}
826
827/// Map an `ff_attempt.outcome` string to a [`TerminalOutcome`]. Only
828/// meaningful when `lifecycle_phase` is terminal/cancelled; otherwise
829/// returns `TerminalOutcome::None`.
830fn derive_terminal_outcome(
831    phase_norm: &str,
832    phase_raw: &str,
833    attempt_outcome: Option<&str>,
834) -> TerminalOutcome {
835    if phase_norm != "terminal" {
836        return TerminalOutcome::None;
837    }
838    if phase_raw == "cancelled" {
839        return TerminalOutcome::Cancelled;
840    }
841    match attempt_outcome {
842        Some("success") => TerminalOutcome::Success,
843        Some("failed") => TerminalOutcome::Failed,
844        Some("cancelled") => TerminalOutcome::Cancelled,
845        Some("expired") => TerminalOutcome::Expired,
846        Some("skipped") => TerminalOutcome::Skipped,
847        _ => TerminalOutcome::None,
848    }
849}
850
851/// RFC-020 §4.1 — `read_execution_state`: single-column point read of
852/// `public_state`. `Ok(None)` when the execution is missing.
853pub(super) async fn read_execution_state_impl(
854    pool: &PgPool,
855    _partition_config: &PartitionConfig,
856    id: &ExecutionId,
857) -> Result<Option<PublicState>, EngineError> {
858    let partition_key: i16 = id.partition() as i16;
859    let execution_id = eid_uuid(id);
860
861    let row: Option<(String,)> = sqlx::query_as(
862        r#"
863        SELECT public_state
864        FROM ff_exec_core
865        WHERE partition_key = $1 AND execution_id = $2
866        "#,
867    )
868    .bind(partition_key)
869    .bind(execution_id)
870    .fetch_optional(pool)
871    .await
872    .map_err(map_sqlx_error)?;
873
874    let Some((raw,)) = row else {
875        return Ok(None);
876    };
877    let parsed: PublicState =
878        json_enum!(PublicState, "public_state", normalise_public_state(&raw))?;
879    Ok(Some(parsed))
880}
881
882/// RFC-020 §4.1 — `read_execution_info`: multi-column projection of
883/// `ff_exec_core` + `LEFT JOIN LATERAL` on `ff_attempt` (current attempt
884/// row) to build [`ExecutionInfo`]. `Ok(None)` when the execution is
885/// missing. Partition-local (both tables co-located on `partition_key`,
886/// RFC-011).
887pub(super) async fn read_execution_info_impl(
888    pool: &PgPool,
889    _partition_config: &PartitionConfig,
890    id: &ExecutionId,
891) -> Result<Option<ExecutionInfo>, EngineError> {
892    let partition_key: i16 = id.partition() as i16;
893    let execution_id = eid_uuid(id);
894
895    // LATERAL join: `cur` pins to the current attempt (for
896    // `outcome` — drives `TerminalOutcome`). Since migration 0016
897    // (#356) `ff_exec_core.started_at_ms` is a set-once column
898    // populated on first claim, so the `ExecutionInfo.started_at`
899    // field reads directly from the base row — the second LATERAL
900    // join on `ff_attempt.started_at_ms` (earliest non-NULL) is gone.
901    let row = sqlx::query(
902        r#"
903        SELECT ec.flow_id,
904               ec.lane_id,
905               ec.priority,
906               ec.lifecycle_phase,
907               ec.ownership_state,
908               ec.eligibility_state,
909               ec.public_state,
910               ec.attempt_state,
911               ec.blocking_reason,
912               ec.attempt_index,
913               ec.created_at_ms,
914               ec.terminal_at_ms,
915               ec.started_at_ms,
916               ec.raw_fields,
917               cur.outcome       AS attempt_outcome
918        FROM ff_exec_core ec
919        LEFT JOIN LATERAL (
920            SELECT outcome
921            FROM ff_attempt
922            WHERE partition_key = ec.partition_key
923              AND execution_id  = ec.execution_id
924              AND attempt_index = ec.attempt_index
925        ) cur ON TRUE
926        WHERE ec.partition_key = $1 AND ec.execution_id = $2
927        "#,
928    )
929    .bind(partition_key)
930    .bind(execution_id)
931    .fetch_optional(pool)
932    .await
933    .map_err(map_sqlx_error)?;
934
935    let Some(row) = row else {
936        return Ok(None);
937    };
938
939    let flow_id_uuid: Option<Uuid> = row.try_get("flow_id").map_err(map_sqlx_error)?;
940    let lane_id: String = row.try_get("lane_id").map_err(map_sqlx_error)?;
941    let priority: i32 = row.try_get("priority").map_err(map_sqlx_error)?;
942    let lifecycle_phase_raw: String =
943        row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
944    let ownership_state_raw: String =
945        row.try_get("ownership_state").map_err(map_sqlx_error)?;
946    let eligibility_state_raw: String =
947        row.try_get("eligibility_state").map_err(map_sqlx_error)?;
948    let public_state_raw: String = row.try_get("public_state").map_err(map_sqlx_error)?;
949    let attempt_state_raw: String = row.try_get("attempt_state").map_err(map_sqlx_error)?;
950    let blocking_reason_opt: Option<String> =
951        row.try_get("blocking_reason").map_err(map_sqlx_error)?;
952    let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
953    let created_at_ms: i64 = row.try_get("created_at_ms").map_err(map_sqlx_error)?;
954    let terminal_at_ms_opt: Option<i64> =
955        row.try_get("terminal_at_ms").map_err(map_sqlx_error)?;
956    let raw_fields: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
957    let attempt_outcome_opt: Option<String> =
958        row.try_get("attempt_outcome").map_err(map_sqlx_error)?;
959    let started_at_ms_opt: Option<i64> =
960        row.try_get("started_at_ms").map_err(map_sqlx_error)?;
961
962    let lifecycle_phase: LifecyclePhase = json_enum!(
963        LifecyclePhase,
964        "lifecycle_phase",
965        normalise_lifecycle_phase(&lifecycle_phase_raw)
966    )?;
967    let ownership_state: OwnershipState = json_enum!(
968        OwnershipState,
969        "ownership_state",
970        normalise_ownership_state(&ownership_state_raw)
971    )?;
972    let eligibility_state: EligibilityState = json_enum!(
973        EligibilityState,
974        "eligibility_state",
975        normalise_eligibility_state(&eligibility_state_raw)
976    )?;
977    let public_state: PublicState = json_enum!(
978        PublicState,
979        "public_state",
980        normalise_public_state(&public_state_raw)
981    )?;
982    let attempt_state: AttemptState = json_enum!(
983        AttemptState,
984        "attempt_state",
985        normalise_attempt_state(&attempt_state_raw)
986    )?;
987    let blocking_reason: BlockingReason = match blocking_reason_opt
988        .as_deref()
989        .filter(|s| !s.is_empty())
990    {
991        None => BlockingReason::None,
992        Some(raw) => json_enum!(BlockingReason, "blocking_reason", raw)?,
993    };
994    let terminal_outcome = derive_terminal_outcome(
995        normalise_lifecycle_phase(&lifecycle_phase_raw),
996        &lifecycle_phase_raw,
997        attempt_outcome_opt.as_deref(),
998    );
999
1000    let state_vector = StateVector {
1001        lifecycle_phase,
1002        ownership_state,
1003        eligibility_state,
1004        blocking_reason,
1005        terminal_outcome,
1006        attempt_state,
1007        public_state,
1008    };
1009
1010    // Scalar fields from raw_fields JSON (namespace, execution_kind,
1011    // blocking_detail). Same shape as `describe_execution_impl` reads.
1012    let mut namespace = String::new();
1013    let mut execution_kind = String::new();
1014    let mut blocking_detail = String::new();
1015    if let JsonValue::Object(map) = &raw_fields {
1016        if let Some(JsonValue::String(s)) = map.get("namespace") {
1017            namespace = s.clone();
1018        }
1019        if let Some(JsonValue::String(s)) = map.get("execution_kind") {
1020            execution_kind = s.clone();
1021        }
1022        if let Some(JsonValue::String(s)) = map.get("blocking_detail") {
1023            blocking_detail = s.clone();
1024        }
1025    }
1026
1027    // `ExecutionInfo.flow_id` is the bare UUID wire form of `FlowId`
1028    // (per `uuid_id!` macro — no hash-tag prefix). Valkey stores the
1029    // bare UUID in `exec_core["flow_id"]` too (Valkey parity).
1030    let flow_id = flow_id_uuid.map(|fid| fid.to_string());
1031
1032    Ok(Some(ExecutionInfo {
1033        execution_id: id.clone(),
1034        namespace,
1035        lane_id,
1036        priority,
1037        execution_kind,
1038        state_vector,
1039        public_state,
1040        created_at: created_at_ms.to_string(),
1041        started_at: started_at_ms_opt.map(|v| v.to_string()),
1042        completed_at: terminal_at_ms_opt.map(|v| v.to_string()),
1043        current_attempt_index: attempt_index.max(0) as u32,
1044        flow_id,
1045        blocking_detail,
1046    }))
1047}
1048
1049/// RFC-020 §4.1 + §7.8 — `get_execution_result`: current-attempt
1050/// semantics (matches Valkey's `GET ctx.result()`). Reads the `result`
1051/// column from `ff_exec_core`; `Ok(None)` when the execution is missing
1052/// or the result is `NULL` (not yet terminal, or cancelled without a
1053/// payload).
1054pub(super) async fn get_execution_result_impl(
1055    pool: &PgPool,
1056    _partition_config: &PartitionConfig,
1057    id: &ExecutionId,
1058) -> Result<Option<Vec<u8>>, EngineError> {
1059    let partition_key: i16 = id.partition() as i16;
1060    let execution_id = eid_uuid(id);
1061
1062    let row: Option<(Option<Vec<u8>>,)> = sqlx::query_as(
1063        r#"
1064        SELECT result
1065        FROM ff_exec_core
1066        WHERE partition_key = $1 AND execution_id = $2
1067        "#,
1068    )
1069    .bind(partition_key)
1070    .bind(execution_id)
1071    .fetch_optional(pool)
1072    .await
1073    .map_err(map_sqlx_error)?;
1074
1075    match row {
1076        None => Ok(None),
1077        Some((payload,)) => Ok(payload),
1078    }
1079}
1080
1081
1082// ─── set_execution_tag / get_execution_tag (issue #433) ──────────
1083
1084/// Upsert a single namespaced tag into `raw_fields->'tags'->><key>`.
1085/// Matches the `EngineBackend::set_execution_tag` wire semantics: key
1086/// is assumed pre-validated by `ff_core::engine_backend::validate_tag_key`.
1087///
1088/// Missing execution → `EngineError::NotFound { entity: "execution" }`
1089/// (mirrors Valkey's `execution_not_found` FCALL mapping).
1090pub(super) async fn set_execution_tag_impl(
1091    pool: &PgPool,
1092    id: &ExecutionId,
1093    key: &str,
1094    value: &str,
1095) -> Result<(), EngineError> {
1096    let partition_key: i16 = id.partition() as i16;
1097    let execution_id = eid_uuid(id);
1098
1099    let result = sqlx::query(
1100        r#"
1101        UPDATE ff_exec_core
1102        SET raw_fields = jsonb_set(
1103            COALESCE(raw_fields, '{}'::jsonb),
1104            ARRAY['tags', $3::text],
1105            to_jsonb($4::text),
1106            true
1107        )
1108        WHERE partition_key = $1 AND execution_id = $2
1109        "#,
1110    )
1111    .bind(partition_key)
1112    .bind(execution_id)
1113    .bind(key)
1114    .bind(value)
1115    .execute(pool)
1116    .await
1117    .map_err(map_sqlx_error)?;
1118
1119    if result.rows_affected() == 0 {
1120        return Err(EngineError::NotFound {
1121            entity: "execution",
1122        });
1123    }
1124    Ok(())
1125}
1126
1127/// Point-read of `raw_fields->'tags'->><key>`. `Ok(None)` covers
1128/// missing-tag and missing-execution alike — matches Valkey's `HGET`
1129/// collapse (see `EngineBackend::get_execution_tag` rustdoc).
1130pub(super) async fn get_execution_tag_impl(
1131    pool: &PgPool,
1132    id: &ExecutionId,
1133    key: &str,
1134) -> Result<Option<String>, EngineError> {
1135    let partition_key: i16 = id.partition() as i16;
1136    let execution_id = eid_uuid(id);
1137
1138    let row: Option<(Option<String>,)> = sqlx::query_as(
1139        r#"
1140        SELECT raw_fields->'tags'->>$3
1141        FROM ff_exec_core
1142        WHERE partition_key = $1 AND execution_id = $2
1143        "#,
1144    )
1145    .bind(partition_key)
1146    .bind(execution_id)
1147    .bind(key)
1148    .fetch_optional(pool)
1149    .await
1150    .map_err(map_sqlx_error)?;
1151
1152    Ok(row.and_then(|(tag,)| tag))
1153}
1154
1155/// Point-read of `raw_fields->>'namespace'`. `Ok(None)` covers
1156/// missing-namespace and missing-execution alike (matches
1157/// `get_execution_tag_impl`). Used by the scanner per-candidate
1158/// filter — preserves the 1-point-read cost contract vs the heavier
1159/// `describe_execution` full-snapshot read.
1160pub(super) async fn get_execution_namespace_impl(
1161    pool: &PgPool,
1162    id: &ExecutionId,
1163) -> Result<Option<String>, EngineError> {
1164    let partition_key: i16 = id.partition() as i16;
1165    let execution_id = eid_uuid(id);
1166
1167    let row: Option<(Option<String>,)> = sqlx::query_as(
1168        r#"
1169        SELECT raw_fields->>'namespace'
1170        FROM ff_exec_core
1171        WHERE partition_key = $1 AND execution_id = $2
1172        "#,
1173    )
1174    .bind(partition_key)
1175    .bind(execution_id)
1176    .fetch_optional(pool)
1177    .await
1178    .map_err(map_sqlx_error)?;
1179
1180    Ok(row.and_then(|(ns,)| ns))
1181}
1182
1183// ── PR-7b Cluster 2b-B — retention trimmer (cascading purge) ──────────
1184//
1185// Retention trimming is inherently a scan-and-delete loop over time.
1186// The trait surface exists to remove engine-side Valkey coupling, not
1187// to atomise the operation into one round-trip. This impl runs one
1188// transaction per batch, selecting a `batch_size`-bounded set of
1189// terminal+old executions and cascading DELETEs across every sibling
1190// table (the schema has no FK CASCADE). Returns the count of rows
1191// deleted from `ff_exec_core` so the engine scanner can loop when it
1192// saturates the batch.
1193//
1194// Semantics match the Valkey scanner: "terminal executions whose
1195// terminal_at_ms is older than now - retention_ms". Per-execution
1196// policy overrides are deferred — Valkey's scanner applies them
1197// because it reads one exec at a time anyway; SQL retention's per-
1198// batch shape makes overrides a follow-up. For now the SQL path uses
1199// the caller-supplied global retention only.
1200
1201/// Sibling tables (execution-scoped) that retention must clean up
1202/// alongside `ff_exec_core`. Kept here so a new table with an
1203/// `execution_id` column triggers an explicit edit — silent schema
1204/// drift (new table, stale retention) would orphan rows.
1205///
1206/// Split by `execution_id` column type so the deletion loop can bind
1207/// a `uuid[]` directly against uuid-backed tables (preserving the
1208/// btree index on `(partition_key, execution_id)`) and only fall back
1209/// to a text cast for tables whose column is still `text`. The prior
1210/// unified `execution_id::text = ANY($2::text[])` form worked but
1211/// defeated the uuid index on the majority (uuid-backed) side.
1212const RETENTION_SIBLING_TABLES_UUID: &[&str] = &[
1213    "ff_attempt",
1214    "ff_claim_grant",
1215    "ff_completion_event",
1216    "ff_stream_frame",
1217    "ff_stream_meta",
1218    "ff_stream_summary",
1219    "ff_suspension_current",
1220    "ff_waitpoint_pending",
1221];
1222
1223const RETENTION_SIBLING_TABLES_TEXT: &[&str] = &[
1224    "ff_cancel_backlog_member",
1225    "ff_lease_event",
1226    "ff_operator_event",
1227    "ff_quota_admitted",
1228    "ff_quota_window",
1229    "ff_signal_event",
1230];
1231
1232/// Delete a batch of terminal executions past the retention cutoff
1233/// across `ff_exec_core` + every execution-scoped sibling table.
1234/// Returns the number of `ff_exec_core` rows deleted so the engine
1235/// scanner can re-loop on saturation.
1236pub(super) async fn trim_retention_impl(
1237    pool: &PgPool,
1238    partition_key: i16,
1239    lane_id: &str,
1240    retention_ms: u64,
1241    now_ms: i64,
1242    batch_size: u32,
1243    filter: &ff_core::backend::ScannerFilter,
1244) -> Result<u32, EngineError> {
1245    let retention_i64 = i64::try_from(retention_ms).unwrap_or(i64::MAX);
1246    let cutoff = now_ms.saturating_sub(retention_i64);
1247    let limit_i64 = i64::from(batch_size);
1248
1249    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
1250
1251    // Pick a batch of eligible execution_ids. `lifecycle_phase = 'terminal'`
1252    // matches the Valkey semantic (scanner consumes the terminal ZSET);
1253    // `terminal_at_ms` parallels Valkey's `completed_at` score. Rows with
1254    // terminal_at_ms IS NULL (tombstoned-but-not-yet-timestamped edge
1255    // case) are excluded — retention will pick them up on the next pass.
1256    //
1257    // Issue #122: `ScannerFilter` narrows the batch. `namespace` /
1258    // `instance_tag` live in `ff_exec_core.raw_fields` (JSONB); we
1259    // push both dimensions into the SELECT WHERE-clause so a filter-
1260    // scoped scanner cycle never reads rows outside its scope.
1261    let namespace_opt = filter.namespace.as_ref().map(|n| n.as_str().to_owned());
1262    let instance_tag_opt = filter
1263        .instance_tag
1264        .as_ref()
1265        .map(|(k, v)| (k.clone(), v.clone()));
1266
1267    let rows: Vec<(Uuid,)> = sqlx::query_as(
1268        r#"
1269        SELECT execution_id
1270        FROM ff_exec_core
1271        WHERE partition_key = $1
1272          AND lane_id = $2
1273          AND lifecycle_phase = 'terminal'
1274          AND terminal_at_ms IS NOT NULL
1275          AND terminal_at_ms <= $3
1276          AND ($5::text IS NULL OR raw_fields->>'namespace' = $5)
1277          AND ($6::text IS NULL OR raw_fields->'tags'->>$6 = $7)
1278        ORDER BY terminal_at_ms
1279        LIMIT $4
1280        "#,
1281    )
1282    .bind(partition_key)
1283    .bind(lane_id)
1284    .bind(cutoff)
1285    .bind(limit_i64)
1286    .bind(namespace_opt.as_deref())
1287    .bind(instance_tag_opt.as_ref().map(|(k, _)| k.as_str()))
1288    .bind(instance_tag_opt.as_ref().map(|(_, v)| v.as_str()))
1289    .fetch_all(&mut *tx)
1290    .await
1291    .map_err(map_sqlx_error)?;
1292
1293    if rows.is_empty() {
1294        tx.commit().await.map_err(map_sqlx_error)?;
1295        return Ok(0);
1296    }
1297
1298    let eids: Vec<Uuid> = rows.into_iter().map(|(e,)| e).collect();
1299    // Sibling tables split `execution_id` between `uuid` and `text`.
1300    // Bind per-type against the matching allowlist so each DELETE
1301    // hits the native index on its table.
1302    let eid_texts: Vec<String> = eids.iter().map(Uuid::to_string).collect();
1303
1304    // Cascade siblings first; exec_core last. Mirrors the Valkey
1305    // ordering rationale: if a sibling delete fails mid-batch the
1306    // whole tx rolls back and the next pass rebuilds the full delete
1307    // set by re-reading exec_core. Table names are injected from the
1308    // allowlists above — no caller influence on table identity.
1309    for table in RETENTION_SIBLING_TABLES_UUID {
1310        let sql = format!(
1311            "DELETE FROM {} WHERE partition_key = $1 AND execution_id = ANY($2::uuid[])",
1312            table
1313        );
1314        sqlx::query(&sql)
1315            .bind(partition_key)
1316            .bind(&eids)
1317            .execute(&mut *tx)
1318            .await
1319            .map_err(map_sqlx_error)?;
1320    }
1321
1322    for table in RETENTION_SIBLING_TABLES_TEXT {
1323        let sql = format!(
1324            "DELETE FROM {} WHERE partition_key = $1 AND execution_id = ANY($2::text[])",
1325            table
1326        );
1327        sqlx::query(&sql)
1328            .bind(partition_key)
1329            .bind(&eid_texts)
1330            .execute(&mut *tx)
1331            .await
1332            .map_err(map_sqlx_error)?;
1333    }
1334
1335    let deleted = sqlx::query(
1336        "DELETE FROM ff_exec_core \
1337         WHERE partition_key = $1 AND execution_id = ANY($2::uuid[])",
1338    )
1339    .bind(partition_key)
1340    .bind(&eids)
1341    .execute(&mut *tx)
1342    .await
1343    .map_err(map_sqlx_error)?
1344    .rows_affected();
1345
1346    tx.commit().await.map_err(map_sqlx_error)?;
1347
1348    Ok(u32::try_from(deleted).unwrap_or(u32::MAX))
1349}