Skip to main content

ff_backend_postgres/
exec_core.rs

1//! exec_core trait-method family — Postgres impls.
2//!
3//! **RFC-v0.7 Wave 4 (Agent A).** Implements the five exec-core write
4//! + read methods for the Postgres backend:
5//!
6//!  1. [`create_execution_impl`] — inherent entry on
7//!     [`super::PostgresBackend`]. Replaces the Valkey `ff_create_execution`
8//!     FCALL with a single `INSERT ... ON CONFLICT DO NOTHING` for
9//!     idempotency (idempotency-key replay mirrors the FCALL's
10//!     `Duplicate` outcome). Also seeds the global [`ff_lane_registry`]
11//!     row on first sight of a lane (Q6 adjudication — dynamic lanes
12//!     get a registry entry here as well as at server-boot seeding).
13//!  2. [`describe_execution_impl`] — single-row `SELECT` against
14//!     `ff_exec_core`, decoded into [`ExecutionSnapshot`] via the
15//!     shared [`ff_core::contracts::decode::build_execution_snapshot`]
16//!     helper so the snapshot shape matches the Valkey backend
17//!     bit-for-bit.
18//!  3. [`list_executions_impl`] — cursor-paginated forward scan over
19//!     one `partition_key`, `ORDER BY execution_id ASC`. Uses the
20//!     N+1 trick (fetch `limit+1` rows) to decide `next_cursor`.
21//!  4. [`cancel_impl`] — transactional `SELECT ... FOR UPDATE` +
22//!     `UPDATE` to transition the exec row to
23//!     `public_state='cancelled'`, `lifecycle_phase='terminal'`,
24//!     setting `terminal_at_ms` + `cancellation_reason`. Per Q11 the
25//!     default READ COMMITTED isolation suffices; the row lock
26//!     narrows the RMW to one writer.
27//!  5. [`resolve_execution_flow_id_impl`] — one-column lookup by
28//!     `execution_id` (the unique index lets pg skip partition
29//!     pruning for this admin-tooling path).
30//!
31//! Spec authority: `rfcs/drafts/v0.7-migration-master.md` Q5 (partition
32//! math), Q11 (isolation), Q14 (dual-backend — no cross-backend
33//! fields).
34
35use std::collections::HashMap;
36use std::time::{SystemTime, UNIX_EPOCH};
37
38use ff_core::contracts::decode::build_execution_snapshot;
39use ff_core::contracts::{CreateExecutionArgs, ExecutionInfo, ExecutionSnapshot, ListExecutionsPage};
40use ff_core::engine_error::{EngineError, ValidationKind};
41use ff_core::partition::{PartitionConfig, PartitionKey};
42use ff_core::state::{
43    AttemptState, BlockingReason, EligibilityState, LifecyclePhase, OwnershipState, PublicState,
44    StateVector, TerminalOutcome,
45};
46use ff_core::types::{ExecutionId, FlowId};
47use serde_json::Value as JsonValue;
48use sqlx::{PgPool, Row};
49use uuid::Uuid;
50
51use crate::error::map_sqlx_error;
52
53/// Extract the raw UUID suffix from an [`ExecutionId`]'s wire form
54/// (`"{fp:N}:<uuid>"`). The constructors / [`ExecutionId::parse`]
55/// guarantee the `}:` separator exists and the suffix is a valid
56/// UUID.
57fn eid_uuid(eid: &ExecutionId) -> Uuid {
58    let s = eid.as_str();
59    // Shape is enforced by ExecutionId constructors: `{fp:N}:<uuid>`.
60    let suffix = s
61        .split_once("}:")
62        .map(|(_, u)| u)
63        .expect("ExecutionId has `}:` separator (invariant)");
64    Uuid::parse_str(suffix).expect("ExecutionId suffix is a valid UUID (invariant)")
65}
66
67/// Build a typed [`FlowId`] / [`ExecutionId`] string back from a
68/// partition index + UUID. Keeps the `{fp:N}:<uuid>` wire shape the
69/// rest of FF expects.
70fn eid_from_parts(partition: u16, uuid: Uuid) -> Result<ExecutionId, EngineError> {
71    let s = format!("{{fp:{partition}}}:{uuid}");
72    ExecutionId::parse(&s).map_err(|e| EngineError::Validation {
73        kind: ValidationKind::Corruption,
74        detail: format!("exec_core: execution_id: could not reassemble '{s}': {e}"),
75    })
76}
77
78fn now_ms() -> i64 {
79    let d = SystemTime::now()
80        .duration_since(UNIX_EPOCH)
81        .expect("clock is after UNIX_EPOCH");
82    (d.as_millis() as i64).max(0)
83}
84
85// ─── create_execution ─────────────────────────────────────────────
86
87/// Insert one `ff_exec_core` row (idempotent on primary key) + seed
88/// the lane registry.
89///
90/// Idempotent replay: on primary-key conflict we treat the call as a
91/// successful duplicate and return `Ok(args.execution_id)` — matching
92/// `CreateExecutionResult::Duplicate` from the Valkey FCALL path.
93pub(super) async fn create_execution_impl(
94    pool: &PgPool,
95    _partition_config: &PartitionConfig,
96    args: CreateExecutionArgs,
97) -> Result<ExecutionId, EngineError> {
98    let partition_key: i16 = args.execution_id.partition() as i16;
99    let execution_id = eid_uuid(&args.execution_id);
100    let lane_id = args.lane_id.as_str().to_owned();
101    let priority: i32 = args.priority;
102    let created_at_ms: i64 = args.now.0;
103    let deadline_at_ms: Option<i64> = args.execution_deadline_at.map(|t| t.0);
104
105    // `raw_fields` carries every CreateExecution arg that doesn't map
106    // to a typed column today. Describe_execution reads them back via
107    // `build_execution_snapshot`, which speaks HashMap<String,String>
108    // — so we store strings here.
109    let mut raw: serde_json::Map<String, JsonValue> = serde_json::Map::new();
110    raw.insert(
111        "namespace".into(),
112        JsonValue::String(args.namespace.as_str().to_owned()),
113    );
114    raw.insert("execution_kind".into(), JsonValue::String(args.execution_kind));
115    raw.insert(
116        "creator_identity".into(),
117        JsonValue::String(args.creator_identity),
118    );
119    if let Some(k) = args.idempotency_key {
120        raw.insert("idempotency_key".into(), JsonValue::String(k));
121    }
122    if let Some(enc) = args.payload_encoding {
123        raw.insert("payload_encoding".into(), JsonValue::String(enc));
124    }
125    // last_mutation_at mirrors Valkey's exec_core field — initialised
126    // to created_at on first write.
127    raw.insert(
128        "last_mutation_at".into(),
129        JsonValue::String(created_at_ms.to_string()),
130    );
131    raw.insert(
132        "total_attempt_count".into(),
133        JsonValue::String("0".to_owned()),
134    );
135    // Tags live under raw_fields.tags as a JSON object keyed by tag name.
136    let tags_json: serde_json::Map<String, JsonValue> = args
137        .tags
138        .into_iter()
139        .map(|(k, v)| (k, JsonValue::String(v)))
140        .collect();
141    raw.insert("tags".into(), JsonValue::Object(tags_json));
142
143    let raw_fields = JsonValue::Object(raw);
144    let policy_json: Option<JsonValue> = match args.policy {
145        Some(p) => Some(serde_json::to_value(&p).map_err(|e| EngineError::Validation {
146            kind: ValidationKind::InvalidInput,
147            detail: format!("create_execution: policy: serialize failed: {e}"),
148        })?),
149        None => None,
150    };
151
152    // Create exec row + seed lane registry in one transaction so a
153    // concurrent lane-list doesn't see an exec without the lane
154    // registry row.
155    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
156
157    sqlx::query(
158        r#"
159        INSERT INTO ff_exec_core (
160            partition_key, execution_id, flow_id, lane_id,
161            required_capabilities, attempt_index,
162            lifecycle_phase, ownership_state, eligibility_state,
163            public_state, attempt_state,
164            priority, created_at_ms, deadline_at_ms,
165            payload, policy, raw_fields
166        ) VALUES (
167            $1, $2, NULL, $3,
168            '{}'::text[], 0,
169            'submitted', 'unowned', 'eligible_now',
170            'waiting', 'pending',
171            $4, $5, $6,
172            $7, $8, $9
173        )
174        ON CONFLICT (partition_key, execution_id) DO NOTHING
175        "#,
176    )
177    .bind(partition_key)
178    .bind(execution_id)
179    .bind(&lane_id)
180    .bind(priority)
181    .bind(created_at_ms)
182    .bind(deadline_at_ms)
183    .bind(&args.input_payload)
184    .bind(policy_json)
185    .bind(&raw_fields)
186    .execute(&mut *tx)
187    .await
188    .map_err(map_sqlx_error)?;
189
190    // Lane registry: Q6 — dynamic lanes seed here on first use.
191    sqlx::query(
192        r#"
193        INSERT INTO ff_lane_registry (lane_id, registered_at_ms, registered_by)
194        VALUES ($1, $2, $3)
195        ON CONFLICT (lane_id) DO NOTHING
196        "#,
197    )
198    .bind(&lane_id)
199    .bind(created_at_ms)
200    .bind("create_execution")
201    .execute(&mut *tx)
202    .await
203    .map_err(map_sqlx_error)?;
204
205    tx.commit().await.map_err(map_sqlx_error)?;
206
207    Ok(args.execution_id)
208}
209
210// ─── describe_execution ──────────────────────────────────────────
211
212pub(super) async fn describe_execution_impl(
213    pool: &PgPool,
214    _partition_config: &PartitionConfig,
215    id: &ExecutionId,
216) -> Result<Option<ExecutionSnapshot>, EngineError> {
217    let partition_key: i16 = id.partition() as i16;
218    let execution_id = eid_uuid(id);
219
220    let row = sqlx::query(
221        r#"
222        SELECT flow_id, lane_id, public_state, blocking_reason,
223               created_at_ms, raw_fields
224        FROM ff_exec_core
225        WHERE partition_key = $1 AND execution_id = $2
226        "#,
227    )
228    .bind(partition_key)
229    .bind(execution_id)
230    .fetch_optional(pool)
231    .await
232    .map_err(map_sqlx_error)?;
233
234    let Some(row) = row else {
235        return Ok(None);
236    };
237
238    let flow_id_uuid: Option<Uuid> = row.try_get("flow_id").map_err(map_sqlx_error)?;
239    let lane_id: String = row.try_get("lane_id").map_err(map_sqlx_error)?;
240    let public_state: String = row.try_get("public_state").map_err(map_sqlx_error)?;
241    let blocking_reason: Option<String> =
242        row.try_get("blocking_reason").map_err(map_sqlx_error)?;
243    let created_at_ms: i64 = row.try_get("created_at_ms").map_err(map_sqlx_error)?;
244    let raw_fields: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
245
246    // Build the HashMap<String,String> shape the shared decoder
247    // consumes. The Postgres row projection + JSON raw_fields together
248    // carry every field `build_execution_snapshot` reads.
249    let mut core: HashMap<String, String> = HashMap::new();
250    core.insert("public_state".into(), public_state);
251    core.insert("lane_id".into(), lane_id);
252    if let Some(fid) = flow_id_uuid {
253        // Reassemble `{fp:N}:<uuid>` using the exec's own partition
254        // (RFC-011 co-location: exec + flow share a partition).
255        core.insert(
256            "flow_id".into(),
257            format!("{{fp:{part}}}:{fid}", part = id.partition()),
258        );
259    }
260    if let Some(r) = blocking_reason {
261        core.insert("blocking_reason".into(), r);
262    }
263    core.insert("created_at".into(), created_at_ms.to_string());
264
265    // raw_fields-derived scalars. build_execution_snapshot hard-requires
266    // `last_mutation_at`; `create_execution_impl` seeds it to
267    // `created_at_ms`, subsequent mutators (future waves) bump it.
268    if let JsonValue::Object(map) = &raw_fields {
269        for key in [
270            "namespace",
271            "last_mutation_at",
272            "total_attempt_count",
273            "current_attempt_id",
274            "current_attempt_index",
275            "current_waitpoint_id",
276            "blocking_detail",
277        ] {
278            if let Some(JsonValue::String(s)) = map.get(key) {
279                core.insert(key.to_owned(), s.clone());
280            }
281        }
282    }
283
284    // Tags hash: extract from raw_fields.tags JSON object.
285    let tags_raw: HashMap<String, String> = match &raw_fields {
286        JsonValue::Object(map) => match map.get("tags") {
287            Some(JsonValue::Object(tag_map)) => tag_map
288                .iter()
289                .filter_map(|(k, v)| {
290                    v.as_str().map(|s| (k.clone(), s.to_owned()))
291                })
292                .collect(),
293            _ => HashMap::new(),
294        },
295        _ => HashMap::new(),
296    };
297
298    build_execution_snapshot(id.clone(), &core, tags_raw)
299}
300
301// ─── list_executions ─────────────────────────────────────────────
302
303pub(super) async fn list_executions_impl(
304    pool: &PgPool,
305    _partition_config: &PartitionConfig,
306    partition: PartitionKey,
307    cursor: Option<ExecutionId>,
308    limit: usize,
309) -> Result<ListExecutionsPage, EngineError> {
310    if limit == 0 {
311        return Ok(ListExecutionsPage::new(Vec::new(), None));
312    }
313    // Parse the partition tag into a partition index (u16).
314    let parsed = partition.parse().map_err(|e| EngineError::Validation {
315        kind: ValidationKind::InvalidInput,
316        detail: format!("list_executions: partition: '{partition}': {e}"),
317    })?;
318    let partition_key: i16 = parsed.index as i16;
319    let cursor_uuid: Option<Uuid> = cursor.as_ref().map(eid_uuid);
320
321    // N+1 trick: fetch one extra row to decide `next_cursor`.
322    let effective_limit = limit.min(1000);
323    let fetch_limit: i64 = (effective_limit as i64).saturating_add(1);
324
325    let rows = sqlx::query(
326        r#"
327        SELECT execution_id
328        FROM ff_exec_core
329        WHERE partition_key = $1
330          AND ($2::uuid IS NULL OR execution_id > $2)
331        ORDER BY execution_id ASC
332        LIMIT $3
333        "#,
334    )
335    .bind(partition_key)
336    .bind(cursor_uuid)
337    .bind(fetch_limit)
338    .fetch_all(pool)
339    .await
340    .map_err(map_sqlx_error)?;
341
342    let mut ids: Vec<ExecutionId> = Vec::with_capacity(rows.len());
343    for row in &rows {
344        let u: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
345        ids.push(eid_from_parts(parsed.index, u)?);
346    }
347
348    let has_more = ids.len() > effective_limit;
349    if has_more {
350        ids.truncate(effective_limit);
351    }
352    let next_cursor = if has_more { ids.last().cloned() } else { None };
353    Ok(ListExecutionsPage::new(ids, next_cursor))
354}
355
356// ─── cancel ──────────────────────────────────────────────────────
357
358/// Cancel one execution by handle (single-execution cancel; flow-wide
359/// cancel is the sibling Wave-4c agent's lane).
360///
361/// Q11: runs under READ COMMITTED with an explicit row lock via
362/// `SELECT ... FOR UPDATE` to serialise the state check + UPDATE.
363/// Already-terminal executions are a successful no-op (idempotent
364/// replay); mismatched lease surfaces as `EngineError::Contention`.
365pub(super) async fn cancel_impl(
366    pool: &PgPool,
367    _partition_config: &PartitionConfig,
368    execution_id: &ExecutionId,
369    reason: &str,
370) -> Result<(), EngineError> {
371    let partition_key: i16 = execution_id.partition() as i16;
372    let eid_uuid = eid_uuid(execution_id);
373    let now = now_ms();
374
375    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
376
377    let current: Option<(String, String)> = sqlx::query_as(
378        r#"
379        SELECT lifecycle_phase, public_state
380        FROM ff_exec_core
381        WHERE partition_key = $1 AND execution_id = $2
382        FOR UPDATE
383        "#,
384    )
385    .bind(partition_key)
386    .bind(eid_uuid)
387    .fetch_optional(&mut *tx)
388    .await
389    .map_err(map_sqlx_error)?;
390
391    let Some((lifecycle_phase, public_state)) = current else {
392        // Not-found on a cancel is an operator-visible state error —
393        // matches the Valkey FCALL's `execution_not_found` code.
394        tx.rollback().await.map_err(map_sqlx_error)?;
395        return Err(EngineError::Validation {
396            kind: ValidationKind::InvalidInput,
397            detail: format!(
398                "cancel: execution_id={execution_id}: row not found on partition_key={partition_key}"
399            ),
400        });
401    };
402
403    // Terminal-state replay is a successful no-op. Mirrors Valkey's
404    // `reconcile_terminal_replay` path.
405    if lifecycle_phase == "terminal" {
406        tx.rollback().await.map_err(map_sqlx_error)?;
407        // Idempotent success iff already cancelled; other terminal states
408        // (completed/failed/expired/skipped) surface as a state conflict
409        // so operators don't silently squash a real terminal outcome.
410        return if public_state == "cancelled" {
411            Ok(())
412        } else {
413            Err(EngineError::Validation {
414                kind: ValidationKind::InvalidInput,
415                detail: format!(
416                    "cancel: execution_id={execution_id}: already terminal in state '{public_state}'"
417                ),
418            })
419        };
420    }
421
422    sqlx::query(
423        r#"
424        UPDATE ff_exec_core
425        SET lifecycle_phase     = 'terminal',
426            ownership_state     = 'unowned',
427            eligibility_state   = 'not_applicable',
428            public_state        = 'cancelled',
429            attempt_state       = 'cancelled',
430            terminal_at_ms      = $3,
431            cancellation_reason = $4,
432            cancelled_by        = 'worker',
433            raw_fields          = jsonb_set(raw_fields, '{last_mutation_at}', to_jsonb($3::text))
434        WHERE partition_key = $1 AND execution_id = $2
435        "#,
436    )
437    .bind(partition_key)
438    .bind(eid_uuid)
439    .bind(now)
440    .bind(reason)
441    .execute(&mut *tx)
442    .await
443    .map_err(map_sqlx_error)?;
444
445    tx.commit().await.map_err(map_sqlx_error)?;
446    Ok(())
447}
448
449// ─── resolve_execution_flow_id ───────────────────────────────────
450
451pub(super) async fn resolve_execution_flow_id_impl(
452    pool: &PgPool,
453    _partition_config: &PartitionConfig,
454    eid: &ExecutionId,
455) -> Result<Option<FlowId>, EngineError> {
456    let partition_key: i16 = eid.partition() as i16;
457    let execution_id = eid_uuid(eid);
458
459    let row: Option<(Option<Uuid>,)> = sqlx::query_as(
460        r#"
461        SELECT flow_id
462        FROM ff_exec_core
463        WHERE partition_key = $1 AND execution_id = $2
464        "#,
465    )
466    .bind(partition_key)
467    .bind(execution_id)
468    .fetch_optional(pool)
469    .await
470    .map_err(map_sqlx_error)?;
471
472    let Some((maybe_fid,)) = row else {
473        return Ok(None);
474    };
475    let Some(fid_uuid) = maybe_fid else {
476        return Ok(None);
477    };
478    let s = fid_uuid.to_string();
479    FlowId::parse(&s)
480        .map(Some)
481        .map_err(|e| EngineError::Validation {
482            kind: ValidationKind::Corruption,
483            detail: format!(
484                "resolve_execution_flow_id: exec_core.flow_id='{s}' is not a valid FlowId: {e}"
485            ),
486        })
487}
488
489// ─── RFC-020 Wave 9 Spine-B — read model (3 methods) ─────────────
490//
491// All three reads project from the `ff_exec_core` row for the target
492// `(partition_key, execution_id)`; `read_execution_info` additionally
493// LATERAL-joins `ff_attempt` for the current-attempt row (outcome +
494// started_at). READ COMMITTED is sufficient — all three are
495// single-query, read-only, no CAS. Per RFC §4.1 + §7.8,
496// `get_execution_result` is current-attempt semantics (matches Valkey's
497// `GET ctx.result()` primitive; `result` column on `ff_exec_core`).
498//
499// Postgres storage strings for the 6 state-vector dimensions are a
500// superset of the in-core enum literals (`cancelled`, `blocked`,
501// `eligible`, `pending`, `released`, `running`, `pending_claim`, …).
502// These Postgres-specific literals are collapsed to the closest
503// user-facing enum variant by the normalisation helpers below before
504// JSON-deserialising; unknown tokens surface as `Corruption` errors
505// rather than silently defaulting.
506
507// Normalisation maps: `ff_exec_core` literal → closest
508// serde-snake_case enum variant. Each arm is grounded in an actual
509// write site in this crate; unknown tokens fall through so
510// `json_enum!` surfaces `Corruption` loudly.
511
512fn normalise_lifecycle_phase(raw: &str) -> &str {
513    match raw {
514        // Terminal family — `cancelled` is Postgres-specific
515        // (flow.rs:674 writes it directly), `terminal` is canonical.
516        "cancelled" | "terminal" => "terminal",
517        // Pre-runnable + runnable variants collapse to `Runnable`
518        // (the user-facing enum only distinguishes 5 phases). `blocked`
519        // is a legacy literal still referenced in dispatch.rs:352's
520        // CASE clause but no longer written by current paths.
521        "pending" | "runnable" | "eligible" | "blocked" => "runnable",
522        "active" => "active",
523        "suspended" => "suspended",
524        "submitted" => "submitted",
525        other => other,
526    }
527}
528
529fn normalise_ownership_state(raw: &str) -> &str {
530    match raw {
531        // `released` is a Postgres-specific post-suspension marker
532        // (suspend_ops.rs:585); nearest enum variant is `Unowned`.
533        "released" | "unowned" => "unowned",
534        "leased" => "leased",
535        "lease_expired_reclaimable" => "lease_expired_reclaimable",
536        "lease_revoked" => "lease_revoked",
537        other => other,
538    }
539}
540
541fn normalise_eligibility_state(raw: &str) -> &str {
542    match raw {
543        // Terminal-cancelled rows → `NotApplicable` (Valkey parity).
544        "cancelled" => "not_applicable",
545        // Scheduler ClaimGrant transitional state; nearest user-facing
546        // variant is still `EligibleNow` (the exec is about to be
547        // claimed — scheduler has picked it but the worker hasn't
548        // acknowledged yet).
549        "pending_claim" => "eligible_now",
550        other => other,
551    }
552}
553
554fn normalise_attempt_state(raw: &str) -> &str {
555    match raw {
556        // `pending` is the Postgres initial-insert literal
557        // (exec_core.rs:166); `pending_claim` is the scheduler transitional
558        // write. Both collapse to `PendingFirstAttempt`.
559        "pending" | "pending_claim" => "pending_first_attempt",
560        // Bare `running` from the suspension-resume claim path
561        // (suspend_ops.rs:958); canonical form is `running_attempt`.
562        "running" => "running_attempt",
563        // `cancelled` attempt_state (flow.rs cancel path) has no direct
564        // enum variant; nearest is `AttemptTerminal`.
565        "cancelled" => "attempt_terminal",
566        other => other,
567    }
568}
569
570/// Collapse Postgres `public_state` literals to the
571/// `PublicState` snake_case serde form.
572fn normalise_public_state(raw: &str) -> &str {
573    match raw {
574        // Postgres writes the bare `running` literal on the
575        // resume-claim path (suspend_ops.rs:958). Valkey / the
576        // `PublicState` enum spell it `active`.
577        "running" => "active",
578        other => other,
579    }
580}
581
582macro_rules! json_enum {
583    ($ty:ty, $field:expr, $raw:expr) => {{
584        let quoted = format!("\"{}\"", $raw);
585        serde_json::from_str::<$ty>(&quoted).map_err(|e| EngineError::Validation {
586            kind: ValidationKind::Corruption,
587            detail: format!(
588                "exec_core: {}: '{}' is not a known value: {}",
589                $field, $raw, e
590            ),
591        })
592    }};
593}
594
595/// Map an `ff_attempt.outcome` string to a [`TerminalOutcome`]. Only
596/// meaningful when `lifecycle_phase` is terminal/cancelled; otherwise
597/// returns `TerminalOutcome::None`.
598fn derive_terminal_outcome(
599    phase_norm: &str,
600    phase_raw: &str,
601    attempt_outcome: Option<&str>,
602) -> TerminalOutcome {
603    if phase_norm != "terminal" {
604        return TerminalOutcome::None;
605    }
606    if phase_raw == "cancelled" {
607        return TerminalOutcome::Cancelled;
608    }
609    match attempt_outcome {
610        Some("success") => TerminalOutcome::Success,
611        Some("failed") => TerminalOutcome::Failed,
612        Some("cancelled") => TerminalOutcome::Cancelled,
613        Some("expired") => TerminalOutcome::Expired,
614        Some("skipped") => TerminalOutcome::Skipped,
615        _ => TerminalOutcome::None,
616    }
617}
618
619/// RFC-020 §4.1 — `read_execution_state`: single-column point read of
620/// `public_state`. `Ok(None)` when the execution is missing.
621pub(super) async fn read_execution_state_impl(
622    pool: &PgPool,
623    _partition_config: &PartitionConfig,
624    id: &ExecutionId,
625) -> Result<Option<PublicState>, EngineError> {
626    let partition_key: i16 = id.partition() as i16;
627    let execution_id = eid_uuid(id);
628
629    let row: Option<(String,)> = sqlx::query_as(
630        r#"
631        SELECT public_state
632        FROM ff_exec_core
633        WHERE partition_key = $1 AND execution_id = $2
634        "#,
635    )
636    .bind(partition_key)
637    .bind(execution_id)
638    .fetch_optional(pool)
639    .await
640    .map_err(map_sqlx_error)?;
641
642    let Some((raw,)) = row else {
643        return Ok(None);
644    };
645    let parsed: PublicState =
646        json_enum!(PublicState, "public_state", normalise_public_state(&raw))?;
647    Ok(Some(parsed))
648}
649
650/// RFC-020 §4.1 — `read_execution_info`: multi-column projection of
651/// `ff_exec_core` + `LEFT JOIN LATERAL` on `ff_attempt` (current attempt
652/// row) to build [`ExecutionInfo`]. `Ok(None)` when the execution is
653/// missing. Partition-local (both tables co-located on `partition_key`,
654/// RFC-011).
655pub(super) async fn read_execution_info_impl(
656    pool: &PgPool,
657    _partition_config: &PartitionConfig,
658    id: &ExecutionId,
659) -> Result<Option<ExecutionInfo>, EngineError> {
660    let partition_key: i16 = id.partition() as i16;
661    let execution_id = eid_uuid(id);
662
663    // LATERAL joins: `cur` pins to the current attempt (for
664    // `outcome` — drives `TerminalOutcome`); `first` pins to the
665    // earliest attempt row on the execution (attempt_index ASC — drives
666    // `started_at`, which per `ExecutionInfo` contract is the
667    // first-claim timestamp preserved across retries — Valkey parity).
668    let row = sqlx::query(
669        r#"
670        SELECT ec.flow_id,
671               ec.lane_id,
672               ec.priority,
673               ec.lifecycle_phase,
674               ec.ownership_state,
675               ec.eligibility_state,
676               ec.public_state,
677               ec.attempt_state,
678               ec.blocking_reason,
679               ec.attempt_index,
680               ec.created_at_ms,
681               ec.terminal_at_ms,
682               ec.raw_fields,
683               cur.outcome       AS attempt_outcome,
684               first_att.started_at_ms AS first_started_at_ms
685        FROM ff_exec_core ec
686        LEFT JOIN LATERAL (
687            SELECT outcome
688            FROM ff_attempt
689            WHERE partition_key = ec.partition_key
690              AND execution_id  = ec.execution_id
691              AND attempt_index = ec.attempt_index
692        ) cur ON TRUE
693        LEFT JOIN LATERAL (
694            SELECT started_at_ms
695            FROM ff_attempt
696            WHERE partition_key = ec.partition_key
697              AND execution_id  = ec.execution_id
698              AND started_at_ms IS NOT NULL
699            ORDER BY attempt_index ASC
700            LIMIT 1
701        ) first_att ON TRUE
702        WHERE ec.partition_key = $1 AND ec.execution_id = $2
703        "#,
704    )
705    .bind(partition_key)
706    .bind(execution_id)
707    .fetch_optional(pool)
708    .await
709    .map_err(map_sqlx_error)?;
710
711    let Some(row) = row else {
712        return Ok(None);
713    };
714
715    let flow_id_uuid: Option<Uuid> = row.try_get("flow_id").map_err(map_sqlx_error)?;
716    let lane_id: String = row.try_get("lane_id").map_err(map_sqlx_error)?;
717    let priority: i32 = row.try_get("priority").map_err(map_sqlx_error)?;
718    let lifecycle_phase_raw: String =
719        row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
720    let ownership_state_raw: String =
721        row.try_get("ownership_state").map_err(map_sqlx_error)?;
722    let eligibility_state_raw: String =
723        row.try_get("eligibility_state").map_err(map_sqlx_error)?;
724    let public_state_raw: String = row.try_get("public_state").map_err(map_sqlx_error)?;
725    let attempt_state_raw: String = row.try_get("attempt_state").map_err(map_sqlx_error)?;
726    let blocking_reason_opt: Option<String> =
727        row.try_get("blocking_reason").map_err(map_sqlx_error)?;
728    let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
729    let created_at_ms: i64 = row.try_get("created_at_ms").map_err(map_sqlx_error)?;
730    let terminal_at_ms_opt: Option<i64> =
731        row.try_get("terminal_at_ms").map_err(map_sqlx_error)?;
732    let raw_fields: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
733    let attempt_outcome_opt: Option<String> =
734        row.try_get("attempt_outcome").map_err(map_sqlx_error)?;
735    let first_started_at_ms_opt: Option<i64> =
736        row.try_get("first_started_at_ms").map_err(map_sqlx_error)?;
737
738    let lifecycle_phase: LifecyclePhase = json_enum!(
739        LifecyclePhase,
740        "lifecycle_phase",
741        normalise_lifecycle_phase(&lifecycle_phase_raw)
742    )?;
743    let ownership_state: OwnershipState = json_enum!(
744        OwnershipState,
745        "ownership_state",
746        normalise_ownership_state(&ownership_state_raw)
747    )?;
748    let eligibility_state: EligibilityState = json_enum!(
749        EligibilityState,
750        "eligibility_state",
751        normalise_eligibility_state(&eligibility_state_raw)
752    )?;
753    let public_state: PublicState = json_enum!(
754        PublicState,
755        "public_state",
756        normalise_public_state(&public_state_raw)
757    )?;
758    let attempt_state: AttemptState = json_enum!(
759        AttemptState,
760        "attempt_state",
761        normalise_attempt_state(&attempt_state_raw)
762    )?;
763    let blocking_reason: BlockingReason = match blocking_reason_opt
764        .as_deref()
765        .filter(|s| !s.is_empty())
766    {
767        None => BlockingReason::None,
768        Some(raw) => json_enum!(BlockingReason, "blocking_reason", raw)?,
769    };
770    let terminal_outcome = derive_terminal_outcome(
771        normalise_lifecycle_phase(&lifecycle_phase_raw),
772        &lifecycle_phase_raw,
773        attempt_outcome_opt.as_deref(),
774    );
775
776    let state_vector = StateVector {
777        lifecycle_phase,
778        ownership_state,
779        eligibility_state,
780        blocking_reason,
781        terminal_outcome,
782        attempt_state,
783        public_state,
784    };
785
786    // Scalar fields from raw_fields JSON (namespace, execution_kind,
787    // blocking_detail). Same shape as `describe_execution_impl` reads.
788    let mut namespace = String::new();
789    let mut execution_kind = String::new();
790    let mut blocking_detail = String::new();
791    if let JsonValue::Object(map) = &raw_fields {
792        if let Some(JsonValue::String(s)) = map.get("namespace") {
793            namespace = s.clone();
794        }
795        if let Some(JsonValue::String(s)) = map.get("execution_kind") {
796            execution_kind = s.clone();
797        }
798        if let Some(JsonValue::String(s)) = map.get("blocking_detail") {
799            blocking_detail = s.clone();
800        }
801    }
802
803    // `ExecutionInfo.flow_id` is the bare UUID wire form of `FlowId`
804    // (per `uuid_id!` macro — no hash-tag prefix). Valkey stores the
805    // bare UUID in `exec_core["flow_id"]` too (Valkey parity).
806    let flow_id = flow_id_uuid.map(|fid| fid.to_string());
807
808    Ok(Some(ExecutionInfo {
809        execution_id: id.clone(),
810        namespace,
811        lane_id,
812        priority,
813        execution_kind,
814        state_vector,
815        public_state,
816        created_at: created_at_ms.to_string(),
817        started_at: first_started_at_ms_opt.map(|v| v.to_string()),
818        completed_at: terminal_at_ms_opt.map(|v| v.to_string()),
819        current_attempt_index: attempt_index.max(0) as u32,
820        flow_id,
821        blocking_detail,
822    }))
823}
824
825/// RFC-020 §4.1 + §7.8 — `get_execution_result`: current-attempt
826/// semantics (matches Valkey's `GET ctx.result()`). Reads the `result`
827/// column from `ff_exec_core`; `Ok(None)` when the execution is missing
828/// or the result is `NULL` (not yet terminal, or cancelled without a
829/// payload).
830pub(super) async fn get_execution_result_impl(
831    pool: &PgPool,
832    _partition_config: &PartitionConfig,
833    id: &ExecutionId,
834) -> Result<Option<Vec<u8>>, EngineError> {
835    let partition_key: i16 = id.partition() as i16;
836    let execution_id = eid_uuid(id);
837
838    let row: Option<(Option<Vec<u8>>,)> = sqlx::query_as(
839        r#"
840        SELECT result
841        FROM ff_exec_core
842        WHERE partition_key = $1 AND execution_id = $2
843        "#,
844    )
845    .bind(partition_key)
846    .bind(execution_id)
847    .fetch_optional(pool)
848    .await
849    .map_err(map_sqlx_error)?;
850
851    match row {
852        None => Ok(None),
853        Some((payload,)) => Ok(payload),
854    }
855}
856