Skip to main content

ff_backend_postgres/
exec_core.rs

1//! exec_core trait-method family — Postgres impls.
2//!
3//! **RFC-v0.7 Wave 4 (Agent A).** Implements the five exec-core write
4//! + read methods for the Postgres backend:
5//!
6//!  1. [`create_execution_impl`] — inherent entry on
7//!     [`super::PostgresBackend`]. Replaces the Valkey `ff_create_execution`
8//!     FCALL with a single `INSERT ... ON CONFLICT DO NOTHING` for
9//!     idempotency (idempotency-key replay mirrors the FCALL's
10//!     `Duplicate` outcome). Also seeds the global [`ff_lane_registry`]
11//!     row on first sight of a lane (Q6 adjudication — dynamic lanes
12//!     get a registry entry here as well as at server-boot seeding).
13//!  2. [`describe_execution_impl`] — single-row `SELECT` against
14//!     `ff_exec_core`, decoded into [`ExecutionSnapshot`] via the
15//!     shared [`ff_core::contracts::decode::build_execution_snapshot`]
16//!     helper so the snapshot shape matches the Valkey backend
17//!     bit-for-bit.
18//!  3. [`list_executions_impl`] — cursor-paginated forward scan over
19//!     one `partition_key`, `ORDER BY execution_id ASC`. Uses the
20//!     N+1 trick (fetch `limit+1` rows) to decide `next_cursor`.
21//!  4. [`cancel_impl`] — transactional `SELECT ... FOR UPDATE` +
22//!     `UPDATE` to transition the exec row to
23//!     `public_state='cancelled'`, `lifecycle_phase='terminal'`,
24//!     setting `terminal_at_ms` + `cancellation_reason`. Per Q11 the
25//!     default READ COMMITTED isolation suffices; the row lock
26//!     narrows the RMW to one writer.
27//!  5. [`resolve_execution_flow_id_impl`] — one-column lookup by
28//!     `execution_id` (the unique index lets pg skip partition
29//!     pruning for this admin-tooling path).
30//!
31//! Spec authority: `rfcs/drafts/v0.7-migration-master.md` Q5 (partition
32//! math), Q11 (isolation), Q14 (dual-backend — no cross-backend
33//! fields).
34
35use std::collections::HashMap;
36use std::time::{SystemTime, UNIX_EPOCH};
37
38use ff_core::contracts::decode::build_execution_snapshot;
39use ff_core::contracts::{CreateExecutionArgs, ExecutionSnapshot, ListExecutionsPage};
40use ff_core::engine_error::{EngineError, ValidationKind};
41use ff_core::partition::{PartitionConfig, PartitionKey};
42use ff_core::types::{ExecutionId, FlowId};
43use serde_json::Value as JsonValue;
44use sqlx::{PgPool, Row};
45use uuid::Uuid;
46
47use crate::error::map_sqlx_error;
48
49/// Extract the raw UUID suffix from an [`ExecutionId`]'s wire form
50/// (`"{fp:N}:<uuid>"`). The constructors / [`ExecutionId::parse`]
51/// guarantee the `}:` separator exists and the suffix is a valid
52/// UUID.
53fn eid_uuid(eid: &ExecutionId) -> Uuid {
54    let s = eid.as_str();
55    // Shape is enforced by ExecutionId constructors: `{fp:N}:<uuid>`.
56    let suffix = s
57        .split_once("}:")
58        .map(|(_, u)| u)
59        .expect("ExecutionId has `}:` separator (invariant)");
60    Uuid::parse_str(suffix).expect("ExecutionId suffix is a valid UUID (invariant)")
61}
62
63/// Build a typed [`FlowId`] / [`ExecutionId`] string back from a
64/// partition index + UUID. Keeps the `{fp:N}:<uuid>` wire shape the
65/// rest of FF expects.
66fn eid_from_parts(partition: u16, uuid: Uuid) -> Result<ExecutionId, EngineError> {
67    let s = format!("{{fp:{partition}}}:{uuid}");
68    ExecutionId::parse(&s).map_err(|e| EngineError::Validation {
69        kind: ValidationKind::Corruption,
70        detail: format!("exec_core: execution_id: could not reassemble '{s}': {e}"),
71    })
72}
73
74fn now_ms() -> i64 {
75    let d = SystemTime::now()
76        .duration_since(UNIX_EPOCH)
77        .expect("clock is after UNIX_EPOCH");
78    (d.as_millis() as i64).max(0)
79}
80
81// ─── create_execution ─────────────────────────────────────────────
82
83/// Insert one `ff_exec_core` row (idempotent on primary key) + seed
84/// the lane registry.
85///
86/// Idempotent replay: on primary-key conflict we treat the call as a
87/// successful duplicate and return `Ok(args.execution_id)` — matching
88/// `CreateExecutionResult::Duplicate` from the Valkey FCALL path.
89pub(super) async fn create_execution_impl(
90    pool: &PgPool,
91    _partition_config: &PartitionConfig,
92    args: CreateExecutionArgs,
93) -> Result<ExecutionId, EngineError> {
94    let partition_key: i16 = args.execution_id.partition() as i16;
95    let execution_id = eid_uuid(&args.execution_id);
96    let lane_id = args.lane_id.as_str().to_owned();
97    let priority: i32 = args.priority;
98    let created_at_ms: i64 = args.now.0;
99    let deadline_at_ms: Option<i64> = args.execution_deadline_at.map(|t| t.0);
100
101    // `raw_fields` carries every CreateExecution arg that doesn't map
102    // to a typed column today. Describe_execution reads them back via
103    // `build_execution_snapshot`, which speaks HashMap<String,String>
104    // — so we store strings here.
105    let mut raw: serde_json::Map<String, JsonValue> = serde_json::Map::new();
106    raw.insert(
107        "namespace".into(),
108        JsonValue::String(args.namespace.as_str().to_owned()),
109    );
110    raw.insert("execution_kind".into(), JsonValue::String(args.execution_kind));
111    raw.insert(
112        "creator_identity".into(),
113        JsonValue::String(args.creator_identity),
114    );
115    if let Some(k) = args.idempotency_key {
116        raw.insert("idempotency_key".into(), JsonValue::String(k));
117    }
118    if let Some(enc) = args.payload_encoding {
119        raw.insert("payload_encoding".into(), JsonValue::String(enc));
120    }
121    // last_mutation_at mirrors Valkey's exec_core field — initialised
122    // to created_at on first write.
123    raw.insert(
124        "last_mutation_at".into(),
125        JsonValue::String(created_at_ms.to_string()),
126    );
127    raw.insert(
128        "total_attempt_count".into(),
129        JsonValue::String("0".to_owned()),
130    );
131    // Tags live under raw_fields.tags as a JSON object keyed by tag name.
132    let tags_json: serde_json::Map<String, JsonValue> = args
133        .tags
134        .into_iter()
135        .map(|(k, v)| (k, JsonValue::String(v)))
136        .collect();
137    raw.insert("tags".into(), JsonValue::Object(tags_json));
138
139    let raw_fields = JsonValue::Object(raw);
140    let policy_json: Option<JsonValue> = match args.policy {
141        Some(p) => Some(serde_json::to_value(&p).map_err(|e| EngineError::Validation {
142            kind: ValidationKind::InvalidInput,
143            detail: format!("create_execution: policy: serialize failed: {e}"),
144        })?),
145        None => None,
146    };
147
148    // Create exec row + seed lane registry in one transaction so a
149    // concurrent lane-list doesn't see an exec without the lane
150    // registry row.
151    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
152
153    sqlx::query(
154        r#"
155        INSERT INTO ff_exec_core (
156            partition_key, execution_id, flow_id, lane_id,
157            required_capabilities, attempt_index,
158            lifecycle_phase, ownership_state, eligibility_state,
159            public_state, attempt_state,
160            priority, created_at_ms, deadline_at_ms,
161            payload, policy, raw_fields
162        ) VALUES (
163            $1, $2, NULL, $3,
164            '{}'::text[], 0,
165            'submitted', 'unowned', 'eligible_now',
166            'waiting', 'pending',
167            $4, $5, $6,
168            $7, $8, $9
169        )
170        ON CONFLICT (partition_key, execution_id) DO NOTHING
171        "#,
172    )
173    .bind(partition_key)
174    .bind(execution_id)
175    .bind(&lane_id)
176    .bind(priority)
177    .bind(created_at_ms)
178    .bind(deadline_at_ms)
179    .bind(&args.input_payload)
180    .bind(policy_json)
181    .bind(&raw_fields)
182    .execute(&mut *tx)
183    .await
184    .map_err(map_sqlx_error)?;
185
186    // Lane registry: Q6 — dynamic lanes seed here on first use.
187    sqlx::query(
188        r#"
189        INSERT INTO ff_lane_registry (lane_id, registered_at_ms, registered_by)
190        VALUES ($1, $2, $3)
191        ON CONFLICT (lane_id) DO NOTHING
192        "#,
193    )
194    .bind(&lane_id)
195    .bind(created_at_ms)
196    .bind("create_execution")
197    .execute(&mut *tx)
198    .await
199    .map_err(map_sqlx_error)?;
200
201    tx.commit().await.map_err(map_sqlx_error)?;
202
203    Ok(args.execution_id)
204}
205
206// ─── describe_execution ──────────────────────────────────────────
207
208pub(super) async fn describe_execution_impl(
209    pool: &PgPool,
210    _partition_config: &PartitionConfig,
211    id: &ExecutionId,
212) -> Result<Option<ExecutionSnapshot>, EngineError> {
213    let partition_key: i16 = id.partition() as i16;
214    let execution_id = eid_uuid(id);
215
216    let row = sqlx::query(
217        r#"
218        SELECT flow_id, lane_id, public_state, blocking_reason,
219               created_at_ms, raw_fields
220        FROM ff_exec_core
221        WHERE partition_key = $1 AND execution_id = $2
222        "#,
223    )
224    .bind(partition_key)
225    .bind(execution_id)
226    .fetch_optional(pool)
227    .await
228    .map_err(map_sqlx_error)?;
229
230    let Some(row) = row else {
231        return Ok(None);
232    };
233
234    let flow_id_uuid: Option<Uuid> = row.try_get("flow_id").map_err(map_sqlx_error)?;
235    let lane_id: String = row.try_get("lane_id").map_err(map_sqlx_error)?;
236    let public_state: String = row.try_get("public_state").map_err(map_sqlx_error)?;
237    let blocking_reason: Option<String> =
238        row.try_get("blocking_reason").map_err(map_sqlx_error)?;
239    let created_at_ms: i64 = row.try_get("created_at_ms").map_err(map_sqlx_error)?;
240    let raw_fields: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
241
242    // Build the HashMap<String,String> shape the shared decoder
243    // consumes. The Postgres row projection + JSON raw_fields together
244    // carry every field `build_execution_snapshot` reads.
245    let mut core: HashMap<String, String> = HashMap::new();
246    core.insert("public_state".into(), public_state);
247    core.insert("lane_id".into(), lane_id);
248    if let Some(fid) = flow_id_uuid {
249        // Reassemble `{fp:N}:<uuid>` using the exec's own partition
250        // (RFC-011 co-location: exec + flow share a partition).
251        core.insert(
252            "flow_id".into(),
253            format!("{{fp:{part}}}:{fid}", part = id.partition()),
254        );
255    }
256    if let Some(r) = blocking_reason {
257        core.insert("blocking_reason".into(), r);
258    }
259    core.insert("created_at".into(), created_at_ms.to_string());
260
261    // raw_fields-derived scalars. build_execution_snapshot hard-requires
262    // `last_mutation_at`; `create_execution_impl` seeds it to
263    // `created_at_ms`, subsequent mutators (future waves) bump it.
264    if let JsonValue::Object(map) = &raw_fields {
265        for key in [
266            "namespace",
267            "last_mutation_at",
268            "total_attempt_count",
269            "current_attempt_id",
270            "current_attempt_index",
271            "current_waitpoint_id",
272            "blocking_detail",
273        ] {
274            if let Some(JsonValue::String(s)) = map.get(key) {
275                core.insert(key.to_owned(), s.clone());
276            }
277        }
278    }
279
280    // Tags hash: extract from raw_fields.tags JSON object.
281    let tags_raw: HashMap<String, String> = match &raw_fields {
282        JsonValue::Object(map) => match map.get("tags") {
283            Some(JsonValue::Object(tag_map)) => tag_map
284                .iter()
285                .filter_map(|(k, v)| {
286                    v.as_str().map(|s| (k.clone(), s.to_owned()))
287                })
288                .collect(),
289            _ => HashMap::new(),
290        },
291        _ => HashMap::new(),
292    };
293
294    build_execution_snapshot(id.clone(), &core, tags_raw)
295}
296
297// ─── list_executions ─────────────────────────────────────────────
298
299pub(super) async fn list_executions_impl(
300    pool: &PgPool,
301    _partition_config: &PartitionConfig,
302    partition: PartitionKey,
303    cursor: Option<ExecutionId>,
304    limit: usize,
305) -> Result<ListExecutionsPage, EngineError> {
306    if limit == 0 {
307        return Ok(ListExecutionsPage::new(Vec::new(), None));
308    }
309    // Parse the partition tag into a partition index (u16).
310    let parsed = partition.parse().map_err(|e| EngineError::Validation {
311        kind: ValidationKind::InvalidInput,
312        detail: format!("list_executions: partition: '{partition}': {e}"),
313    })?;
314    let partition_key: i16 = parsed.index as i16;
315    let cursor_uuid: Option<Uuid> = cursor.as_ref().map(eid_uuid);
316
317    // N+1 trick: fetch one extra row to decide `next_cursor`.
318    let effective_limit = limit.min(1000);
319    let fetch_limit: i64 = (effective_limit as i64).saturating_add(1);
320
321    let rows = sqlx::query(
322        r#"
323        SELECT execution_id
324        FROM ff_exec_core
325        WHERE partition_key = $1
326          AND ($2::uuid IS NULL OR execution_id > $2)
327        ORDER BY execution_id ASC
328        LIMIT $3
329        "#,
330    )
331    .bind(partition_key)
332    .bind(cursor_uuid)
333    .bind(fetch_limit)
334    .fetch_all(pool)
335    .await
336    .map_err(map_sqlx_error)?;
337
338    let mut ids: Vec<ExecutionId> = Vec::with_capacity(rows.len());
339    for row in &rows {
340        let u: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
341        ids.push(eid_from_parts(parsed.index, u)?);
342    }
343
344    let has_more = ids.len() > effective_limit;
345    if has_more {
346        ids.truncate(effective_limit);
347    }
348    let next_cursor = if has_more { ids.last().cloned() } else { None };
349    Ok(ListExecutionsPage::new(ids, next_cursor))
350}
351
352// ─── cancel ──────────────────────────────────────────────────────
353
354/// Cancel one execution by handle (single-execution cancel; flow-wide
355/// cancel is the sibling Wave-4c agent's lane).
356///
357/// Q11: runs under READ COMMITTED with an explicit row lock via
358/// `SELECT ... FOR UPDATE` to serialise the state check + UPDATE.
359/// Already-terminal executions are a successful no-op (idempotent
360/// replay); mismatched lease surfaces as `EngineError::Contention`.
361pub(super) async fn cancel_impl(
362    pool: &PgPool,
363    _partition_config: &PartitionConfig,
364    execution_id: &ExecutionId,
365    reason: &str,
366) -> Result<(), EngineError> {
367    let partition_key: i16 = execution_id.partition() as i16;
368    let eid_uuid = eid_uuid(execution_id);
369    let now = now_ms();
370
371    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
372
373    let current: Option<(String, String)> = sqlx::query_as(
374        r#"
375        SELECT lifecycle_phase, public_state
376        FROM ff_exec_core
377        WHERE partition_key = $1 AND execution_id = $2
378        FOR UPDATE
379        "#,
380    )
381    .bind(partition_key)
382    .bind(eid_uuid)
383    .fetch_optional(&mut *tx)
384    .await
385    .map_err(map_sqlx_error)?;
386
387    let Some((lifecycle_phase, public_state)) = current else {
388        // Not-found on a cancel is an operator-visible state error —
389        // matches the Valkey FCALL's `execution_not_found` code.
390        tx.rollback().await.map_err(map_sqlx_error)?;
391        return Err(EngineError::Validation {
392            kind: ValidationKind::InvalidInput,
393            detail: format!(
394                "cancel: execution_id={execution_id}: row not found on partition_key={partition_key}"
395            ),
396        });
397    };
398
399    // Terminal-state replay is a successful no-op. Mirrors Valkey's
400    // `reconcile_terminal_replay` path.
401    if lifecycle_phase == "terminal" {
402        tx.rollback().await.map_err(map_sqlx_error)?;
403        // Idempotent success iff already cancelled; other terminal states
404        // (completed/failed/expired/skipped) surface as a state conflict
405        // so operators don't silently squash a real terminal outcome.
406        return if public_state == "cancelled" {
407            Ok(())
408        } else {
409            Err(EngineError::Validation {
410                kind: ValidationKind::InvalidInput,
411                detail: format!(
412                    "cancel: execution_id={execution_id}: already terminal in state '{public_state}'"
413                ),
414            })
415        };
416    }
417
418    sqlx::query(
419        r#"
420        UPDATE ff_exec_core
421        SET lifecycle_phase     = 'terminal',
422            ownership_state     = 'unowned',
423            eligibility_state   = 'not_applicable',
424            public_state        = 'cancelled',
425            attempt_state       = 'cancelled',
426            terminal_at_ms      = $3,
427            cancellation_reason = $4,
428            cancelled_by        = 'worker',
429            raw_fields          = jsonb_set(raw_fields, '{last_mutation_at}', to_jsonb($3::text))
430        WHERE partition_key = $1 AND execution_id = $2
431        "#,
432    )
433    .bind(partition_key)
434    .bind(eid_uuid)
435    .bind(now)
436    .bind(reason)
437    .execute(&mut *tx)
438    .await
439    .map_err(map_sqlx_error)?;
440
441    tx.commit().await.map_err(map_sqlx_error)?;
442    Ok(())
443}
444
445// ─── resolve_execution_flow_id ───────────────────────────────────
446
447pub(super) async fn resolve_execution_flow_id_impl(
448    pool: &PgPool,
449    _partition_config: &PartitionConfig,
450    eid: &ExecutionId,
451) -> Result<Option<FlowId>, EngineError> {
452    let partition_key: i16 = eid.partition() as i16;
453    let execution_id = eid_uuid(eid);
454
455    let row: Option<(Option<Uuid>,)> = sqlx::query_as(
456        r#"
457        SELECT flow_id
458        FROM ff_exec_core
459        WHERE partition_key = $1 AND execution_id = $2
460        "#,
461    )
462    .bind(partition_key)
463    .bind(execution_id)
464    .fetch_optional(pool)
465    .await
466    .map_err(map_sqlx_error)?;
467
468    let Some((maybe_fid,)) = row else {
469        return Ok(None);
470    };
471    let Some(fid_uuid) = maybe_fid else {
472        return Ok(None);
473    };
474    let s = fid_uuid.to_string();
475    FlowId::parse(&s)
476        .map(Some)
477        .map_err(|e| EngineError::Validation {
478            kind: ValidationKind::Corruption,
479            detail: format!(
480                "resolve_execution_flow_id: exec_core.flow_id='{s}' is not a valid FlowId: {e}"
481            ),
482        })
483}
484