Skip to main content

ff_backend_postgres/
flow_staging.rs

1//! Postgres flow-staging inherent methods (Wave 4i).
2//!
3//! Mirrors the ingress-layer Lua FCALLs that the Valkey backend ships
4//! under `ff-server::Server` — these are **not** `EngineBackend` trait
5//! methods. The Valkey shape is:
6//!
7//!   * `ff_create_flow`                — INSERT flow_core
8//!   * `ff_add_execution_to_flow`      — stamp exec.flow_id + bump counters
9//!   * `ff_stage_dependency_edge`      — CAS graph_revision + INSERT edge
10//!   * `ff_apply_dependency_to_child`  — mark edge applied + bump edge_group
11//!
12//! Wave 4c already covers the read surface and `cancel_flow`. This
13//! module adds the mutating ingress operations so flow-DAG construction
14//! works on Postgres end-to-end. Wave 5a owns the completion-cascade
15//! dispatch that consumes what `apply_dependency_to_child` writes.
16//!
17//! # Storage convention
18//!
19//! Matches Wave 4c's decoder in `flow.rs`:
20//!
21//!   * Flow-level extras (`flow_kind`, `namespace`, `node_count`,
22//!     `edge_count`, `last_mutation_at_ms`) live in
23//!     `ff_flow_core.raw_fields` jsonb.
24//!   * Edge-level extras (`dependency_kind`, `data_passing_ref`,
25//!     `staged_at_ms`, `applied_at_ms`, `edge_state`, `created_at_ms`,
26//!     `created_by`, `satisfaction_condition`) live in
27//!     `ff_edge.policy` jsonb (the only jsonb slot on that row).
28//!
29//! No schema migration is required — every new field fits in the
30//! existing jsonb columns. This keeps Wave 4c's decoder and the
31//! dispatch queries (which select on typed columns only) unchanged.
32//!
33//! # Membership
34//!
35//! `ff_add_execution_to_flow` / `ff_stage_dependency_edge` treat
36//! membership as `ff_exec_core.flow_id = flow_id` — the same back-
37//! pointer that Wave 4c's `cancel_flow` already queries. No separate
38//! member table is introduced; RFC-011 flow/exec co-location makes the
39//! back-pointer authoritative on the flow's partition.
40
41use ff_core::contracts::{
42    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
43    ApplyDependencyToChildResult, CreateFlowArgs, CreateFlowResult, StageDependencyEdgeArgs,
44    StageDependencyEdgeResult,
45};
46use ff_core::engine_error::{ConflictKind, ContentionKind, EngineError, ValidationKind};
47use ff_core::partition::PartitionConfig;
48use ff_core::types::{ExecutionId, FlowId};
49use serde_json::json;
50use sqlx::{PgPool, Row};
51use uuid::Uuid;
52
53use crate::error::map_sqlx_error;
54
55/// Compute the flow partition byte (0..=255). Matches Wave 4c's helper.
56fn flow_partition_byte(flow_id: &FlowId, pc: &PartitionConfig) -> i16 {
57    ff_core::partition::flow_partition(flow_id, pc).index as i16
58}
59
60/// Parse the UUID suffix from a `{fp:N}:<uuid>` exec id.
61fn parse_exec_uuid(eid: &ExecutionId) -> Result<Uuid, EngineError> {
62    let s = eid.as_str();
63    let Some(colon) = s.rfind("}:") else {
64        return Err(EngineError::Validation {
65            kind: ValidationKind::InvalidInput,
66            detail: format!("execution_id missing '}}:' delimiter: {s}"),
67        });
68    };
69    Uuid::parse_str(&s[colon + 2..]).map_err(|e| EngineError::Validation {
70        kind: ValidationKind::InvalidInput,
71        detail: format!("execution_id uuid suffix: {e}"),
72    })
73}
74
75/// `ff_create_flow` — idempotent INSERT into `ff_flow_core`.
76///
77/// Mirrors `lua/flow.lua :: ff_create_flow`. Graph_revision starts at 0;
78/// `public_flow_state` starts at `'open'`. Typed columns carry what the
79/// engine filters on; `flow_kind`, `namespace`, `node_count`,
80/// `edge_count`, `last_mutation_at_ms` ride in `raw_fields` so Wave 4c's
81/// decoder picks them up.
82pub async fn create_flow(
83    pool: &PgPool,
84    pc: &PartitionConfig,
85    args: &CreateFlowArgs,
86) -> Result<CreateFlowResult, EngineError> {
87    let part = flow_partition_byte(&args.flow_id, pc);
88    let flow_uuid: Uuid = args.flow_id.0;
89    let now_ms = args.now.0;
90
91    let raw_fields = json!({
92        "flow_kind": args.flow_kind,
93        "namespace": args.namespace.as_str(),
94        "node_count": 0,
95        "edge_count": 0,
96        "last_mutation_at_ms": now_ms,
97    });
98
99    // ON CONFLICT DO NOTHING makes the insert idempotent. `RETURNING`
100    // fires only on the insert path; a conflict returns zero rows, so
101    // we can branch on the row count.
102    let inserted = sqlx::query(
103        r#"
104        INSERT INTO ff_flow_core
105            (partition_key, flow_id, graph_revision, public_flow_state,
106             created_at_ms, raw_fields)
107        VALUES ($1, $2, 0, 'open', $3, $4)
108        ON CONFLICT (partition_key, flow_id) DO NOTHING
109        RETURNING flow_id
110        "#,
111    )
112    .bind(part)
113    .bind(flow_uuid)
114    .bind(now_ms)
115    .bind(&raw_fields)
116    .fetch_optional(pool)
117    .await
118    .map_err(map_sqlx_error)?;
119
120    if inserted.is_some() {
121        Ok(CreateFlowResult::Created {
122            flow_id: args.flow_id.clone(),
123        })
124    } else {
125        Ok(CreateFlowResult::AlreadySatisfied {
126            flow_id: args.flow_id.clone(),
127        })
128    }
129}
130
131/// `ff_add_execution_to_flow` — stamp `exec.flow_id` + bump flow counters.
132///
133/// Single transaction:
134///   1. Validate flow exists and is not terminal.
135///   2. Validate exec exists.
136///   3. If exec already on this flow → idempotent `AlreadyMember`.
137///   4. If exec on a different flow → `Validation(InvalidInput)`
138///      (RFC-007: an exec belongs to at most one flow).
139///   5. Stamp `exec_core.flow_id` + bump `graph_revision` and
140///      `raw_fields.node_count` on `flow_core`.
141pub async fn add_execution_to_flow(
142    pool: &PgPool,
143    pc: &PartitionConfig,
144    args: &AddExecutionToFlowArgs,
145) -> Result<AddExecutionToFlowResult, EngineError> {
146    let part = flow_partition_byte(&args.flow_id, pc);
147    let flow_uuid: Uuid = args.flow_id.0;
148    let exec_uuid = parse_exec_uuid(&args.execution_id)?;
149    let now_ms = args.now.0;
150
151    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
152
153    // 1. Flow must exist + not be terminal.
154    let flow_row = sqlx::query(
155        "SELECT public_flow_state, raw_fields FROM ff_flow_core \
156         WHERE partition_key = $1 AND flow_id = $2 FOR UPDATE",
157    )
158    .bind(part)
159    .bind(flow_uuid)
160    .fetch_optional(&mut *tx)
161    .await
162    .map_err(map_sqlx_error)?;
163
164    let Some(flow_row) = flow_row else {
165        return Err(EngineError::Validation {
166            kind: ValidationKind::InvalidInput,
167            detail: "flow_not_found".to_string(),
168        });
169    };
170    let public_flow_state: String = flow_row.get("public_flow_state");
171    if matches!(
172        public_flow_state.as_str(),
173        "cancelled" | "completed" | "failed"
174    ) {
175        return Err(EngineError::Validation {
176            kind: ValidationKind::InvalidInput,
177            detail: "flow_already_terminal".to_string(),
178        });
179    }
180
181    // 2. Exec must exist. The partition co-location invariant
182    //    (RFC-011 §7.3) guarantees the exec row, if any, lives on the
183    //    flow's partition; use the flow partition_key directly.
184    let exec_row = sqlx::query(
185        "SELECT flow_id FROM ff_exec_core \
186         WHERE partition_key = $1 AND execution_id = $2 FOR UPDATE",
187    )
188    .bind(part)
189    .bind(exec_uuid)
190    .fetch_optional(&mut *tx)
191    .await
192    .map_err(map_sqlx_error)?;
193
194    let Some(exec_row) = exec_row else {
195        return Err(EngineError::Validation {
196            kind: ValidationKind::InvalidInput,
197            detail: "execution_not_found".to_string(),
198        });
199    };
200    let existing_flow_id: Option<Uuid> = exec_row.get("flow_id");
201
202    // 3. Idempotent: already on this flow.
203    if existing_flow_id == Some(flow_uuid) {
204        let raw: serde_json::Value = flow_row.get("raw_fields");
205        let nc = raw
206            .get("node_count")
207            .and_then(|v| v.as_u64())
208            .and_then(|n| u32::try_from(n).ok())
209            .unwrap_or(0);
210        tx.commit().await.map_err(map_sqlx_error)?;
211        return Ok(AddExecutionToFlowResult::AlreadyMember {
212            execution_id: args.execution_id.clone(),
213            node_count: nc,
214        });
215    }
216
217    // 4. Cross-flow refusal.
218    if let Some(other) = existing_flow_id
219        && other != flow_uuid
220    {
221        return Err(EngineError::Validation {
222            kind: ValidationKind::InvalidInput,
223            detail: format!("already_member_of_different_flow:{other}"),
224        });
225    }
226
227    // 5. Stamp exec.flow_id + bump flow counters.
228    sqlx::query(
229        "UPDATE ff_exec_core SET flow_id = $3 \
230         WHERE partition_key = $1 AND execution_id = $2",
231    )
232    .bind(part)
233    .bind(exec_uuid)
234    .bind(flow_uuid)
235    .execute(&mut *tx)
236    .await
237    .map_err(map_sqlx_error)?;
238
239    let bumped = sqlx::query(
240        r#"
241        UPDATE ff_flow_core
242           SET graph_revision = graph_revision + 1,
243               raw_fields = raw_fields
244                   || jsonb_build_object(
245                       'node_count',
246                       COALESCE((raw_fields->>'node_count')::int, 0) + 1,
247                       'last_mutation_at_ms', $3::bigint
248                   )
249         WHERE partition_key = $1 AND flow_id = $2
250         RETURNING (raw_fields->>'node_count')::int AS node_count
251        "#,
252    )
253    .bind(part)
254    .bind(flow_uuid)
255    .bind(now_ms)
256    .fetch_one(&mut *tx)
257    .await
258    .map_err(map_sqlx_error)?;
259    let new_nc: i32 = bumped.get("node_count");
260
261    tx.commit().await.map_err(map_sqlx_error)?;
262
263    Ok(AddExecutionToFlowResult::Added {
264        execution_id: args.execution_id.clone(),
265        new_node_count: u32::try_from(new_nc.max(0)).unwrap_or(0),
266    })
267}
268
269/// `ff_stage_dependency_edge` — CAS on graph_revision + INSERT into
270/// `ff_edge`.
271///
272/// Parity with `lua/flow.lua :: ff_stage_dependency_edge`:
273///
274///   * Rejects self-edges, cross-flow membership, terminal flows.
275///   * Mismatching `expected_graph_revision` →
276///     `Contention(StaleGraphRevision)`.
277///   * Duplicate edge_id → `Conflict(DependencyAlreadyExists { .. })`.
278///   * Cycle detection: not implemented here (deferred to a future
279///     Wave; the Valkey path's `detect_cycle` uses BFS over adjacency
280///     SETs that Postgres models as SELECTs — can land in Wave 4j /
281///     Wave 5b without re-opening the contract).
282///
283/// On success: writes `staged_at_ms = now`, `applied_at_ms = null`,
284/// `edge_state = 'pending'` into `policy` jsonb; bumps
285/// `graph_revision` and `raw_fields.edge_count` on `flow_core`.
286pub async fn stage_dependency_edge(
287    pool: &PgPool,
288    pc: &PartitionConfig,
289    args: &StageDependencyEdgeArgs,
290) -> Result<StageDependencyEdgeResult, EngineError> {
291    if args.upstream_execution_id == args.downstream_execution_id {
292        return Err(EngineError::Validation {
293            kind: ValidationKind::InvalidInput,
294            detail: "self_referencing_edge".to_string(),
295        });
296    }
297
298    let part = flow_partition_byte(&args.flow_id, pc);
299    let flow_uuid: Uuid = args.flow_id.0;
300    let edge_uuid: Uuid = args.edge_id.0;
301    let upstream_uuid = parse_exec_uuid(&args.upstream_execution_id)?;
302    let downstream_uuid = parse_exec_uuid(&args.downstream_execution_id)?;
303    let now_ms = args.now.0;
304    let expected_rev = i64::try_from(args.expected_graph_revision).unwrap_or(i64::MAX);
305
306    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
307
308    // 1. Flow must exist + not terminal + graph_revision matches. CAS
309    //    in one UPDATE: the WHERE clause pins the expected rev, and
310    //    the RETURNING tells us whether we won the race.
311    let bumped = sqlx::query(
312        r#"
313        UPDATE ff_flow_core
314           SET graph_revision = graph_revision + 1,
315               raw_fields = raw_fields
316                   || jsonb_build_object(
317                       'edge_count',
318                       COALESCE((raw_fields->>'edge_count')::int, 0) + 1,
319                       'last_mutation_at_ms', $4::bigint
320                   )
321         WHERE partition_key = $1
322           AND flow_id = $2
323           AND graph_revision = $3
324           AND public_flow_state = 'open'
325         RETURNING graph_revision
326        "#,
327    )
328    .bind(part)
329    .bind(flow_uuid)
330    .bind(expected_rev)
331    .bind(now_ms)
332    .fetch_optional(&mut *tx)
333    .await
334    .map_err(map_sqlx_error)?;
335
336    let Some(bumped_row) = bumped else {
337        // Distinguish "flow missing" / "terminal" / "stale rev".
338        let probe = sqlx::query(
339            "SELECT graph_revision, public_flow_state FROM ff_flow_core \
340             WHERE partition_key = $1 AND flow_id = $2",
341        )
342        .bind(part)
343        .bind(flow_uuid)
344        .fetch_optional(&mut *tx)
345        .await
346        .map_err(map_sqlx_error)?;
347        let _ = tx.rollback().await;
348        return match probe {
349            None => Err(EngineError::Validation {
350                kind: ValidationKind::InvalidInput,
351                detail: "flow_not_found".to_string(),
352            }),
353            Some(r) => {
354                let state: String = r.get("public_flow_state");
355                if matches!(state.as_str(), "cancelled" | "completed" | "failed") {
356                    Err(EngineError::Validation {
357                        kind: ValidationKind::InvalidInput,
358                        detail: "flow_already_terminal".to_string(),
359                    })
360                } else {
361                    Err(EngineError::Contention(ContentionKind::StaleGraphRevision))
362                }
363            }
364        };
365    };
366    let new_rev: i64 = bumped_row.get("graph_revision");
367
368    // 2. Membership — both execs must be on this flow.
369    let members: Vec<Uuid> = sqlx::query_scalar(
370        "SELECT execution_id FROM ff_exec_core \
371         WHERE partition_key = $1 AND flow_id = $2 \
372           AND execution_id = ANY($3)",
373    )
374    .bind(part)
375    .bind(flow_uuid)
376    .bind(&[upstream_uuid, downstream_uuid][..])
377    .fetch_all(&mut *tx)
378    .await
379    .map_err(map_sqlx_error)?;
380    if !members.contains(&upstream_uuid) || !members.contains(&downstream_uuid) {
381        let _ = tx.rollback().await;
382        return Err(EngineError::Validation {
383            kind: ValidationKind::InvalidInput,
384            detail: "execution_not_in_flow".to_string(),
385        });
386    }
387
388    // 3. Insert edge. `policy jsonb` carries the immutable edge record
389    //    (dependency_kind, data_passing_ref, staged_at_ms, etc.) so
390    //    Wave 4c's `decode_edge_row` picks it up unchanged.
391    let policy = json!({
392        "dependency_kind": args.dependency_kind,
393        "satisfaction_condition": "all_required",
394        "data_passing_ref": args.data_passing_ref.clone().unwrap_or_default(),
395        "edge_state": "pending",
396        "created_at_ms": now_ms,
397        "created_by": "engine",
398        "staged_at_ms": now_ms,
399        "applied_at_ms": serde_json::Value::Null,
400    });
401
402    let insert = sqlx::query(
403        r#"
404        INSERT INTO ff_edge
405            (partition_key, flow_id, edge_id, upstream_eid, downstream_eid,
406             policy, policy_version)
407        VALUES ($1, $2, $3, $4, $5, $6, 0)
408        ON CONFLICT (partition_key, flow_id, edge_id) DO NOTHING
409        RETURNING edge_id
410        "#,
411    )
412    .bind(part)
413    .bind(flow_uuid)
414    .bind(edge_uuid)
415    .bind(upstream_uuid)
416    .bind(downstream_uuid)
417    .bind(&policy)
418    .fetch_optional(&mut *tx)
419    .await
420    .map_err(map_sqlx_error)?;
421
422    if insert.is_none() {
423        // Edge already exists — surface the existing snapshot for
424        // `ConflictKind::DependencyAlreadyExists` parity. We need to
425        // commit the rev bump rollback since the CAS succeeded but the
426        // downstream write failed.
427        let _ = tx.rollback().await;
428        // Read back for the error payload.
429        let existing = crate::flow::describe_edge(pool, pc, &args.flow_id, &args.edge_id)
430            .await?
431            .ok_or_else(|| EngineError::Validation {
432                kind: ValidationKind::Corruption,
433                detail: "edge vanished between insert and describe".to_string(),
434            })?;
435        return Err(EngineError::Conflict(
436            ConflictKind::DependencyAlreadyExists { existing },
437        ));
438    }
439
440    tx.commit().await.map_err(map_sqlx_error)?;
441
442    Ok(StageDependencyEdgeResult::Staged {
443        edge_id: args.edge_id.clone(),
444        new_graph_revision: u64::try_from(new_rev).unwrap_or(0),
445    })
446}
447
448/// `ff_apply_dependency_to_child` — mark an edge applied + bump the
449/// downstream's edge-group aggregate.
450///
451/// Parity with `lua/flow.lua :: ff_apply_dependency_to_child`:
452///
453///   * Idempotent on already-applied (`policy.applied_at_ms` already
454///     non-null) → returns `AlreadyApplied`.
455///   * Bumps `ff_edge_group.running_count` (the Stage-A "unsatisfied
456///     required count" equivalent). Creates an `ff_edge_group` row
457///     with a default `AllOf` policy when none exists — matches the
458///     Lua path's inline `SET policy_variant='all_of'` fallback.
459///
460/// The cascade (downstream eligibility flip + sibling cancellation) is
461/// driven by Wave 5a's `dispatch::dispatch_completion` → this function
462/// only writes the per-edge state the cascade reads.
463pub async fn apply_dependency_to_child(
464    pool: &PgPool,
465    pc: &PartitionConfig,
466    args: &ApplyDependencyToChildArgs,
467) -> Result<ApplyDependencyToChildResult, EngineError> {
468    let part = flow_partition_byte(&args.flow_id, pc);
469    let flow_uuid: Uuid = args.flow_id.0;
470    let edge_uuid: Uuid = args.edge_id.0;
471    let downstream_uuid = parse_exec_uuid(&args.downstream_execution_id)?;
472    let now_ms = args.now.0;
473
474    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
475
476    // 1. Load + lock the edge row.
477    let edge_row = sqlx::query(
478        "SELECT policy FROM ff_edge \
479         WHERE partition_key = $1 AND flow_id = $2 AND edge_id = $3 \
480         FOR UPDATE",
481    )
482    .bind(part)
483    .bind(flow_uuid)
484    .bind(edge_uuid)
485    .fetch_optional(&mut *tx)
486    .await
487    .map_err(map_sqlx_error)?;
488
489    let Some(edge_row) = edge_row else {
490        return Err(EngineError::Validation {
491            kind: ValidationKind::InvalidInput,
492            detail: "edge_not_found".to_string(),
493        });
494    };
495    let mut policy: serde_json::Value = edge_row.get("policy");
496
497    // 2. Idempotency: already applied? Count this edge in the running
498    //    group state if we have to create the edge_group row (fresh
499    //    flows), but do NOT double-bump `running_count`.
500    let already_applied = policy
501        .get("applied_at_ms")
502        .and_then(|v| v.as_i64())
503        .is_some();
504    if already_applied {
505        tx.commit().await.map_err(map_sqlx_error)?;
506        return Ok(ApplyDependencyToChildResult::AlreadyApplied);
507    }
508
509    // 3. Mark edge applied in the policy jsonb.
510    if let Some(obj) = policy.as_object_mut() {
511        obj.insert("applied_at_ms".to_string(), json!(now_ms));
512        obj.insert("edge_state".to_string(), json!("applied"));
513    }
514    sqlx::query(
515        "UPDATE ff_edge SET policy = $4 \
516         WHERE partition_key = $1 AND flow_id = $2 AND edge_id = $3",
517    )
518    .bind(part)
519    .bind(flow_uuid)
520    .bind(edge_uuid)
521    .bind(&policy)
522    .execute(&mut *tx)
523    .await
524    .map_err(map_sqlx_error)?;
525
526    // 4. Upsert the edge_group row — create with default AllOf when
527    //    missing, bump `running_count` when present. `running_count`
528    //    is the "unsatisfied required" bucket the dispatch cascade
529    //    decrements on each completion.
530    sqlx::query(
531        r#"
532        INSERT INTO ff_edge_group
533            (partition_key, flow_id, downstream_eid, policy, running_count)
534        VALUES ($1, $2, $3, $4, 1)
535        ON CONFLICT (partition_key, flow_id, downstream_eid) DO UPDATE
536           SET running_count = ff_edge_group.running_count + 1
537        "#,
538    )
539    .bind(part)
540    .bind(flow_uuid)
541    .bind(downstream_uuid)
542    .bind(json!({ "kind": "all_of" }))
543    .execute(&mut *tx)
544    .await
545    .map_err(map_sqlx_error)?;
546
547    // Read back the post-update running_count for the result.
548    let unsatisfied: i32 = sqlx::query_scalar(
549        "SELECT running_count FROM ff_edge_group \
550         WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
551    )
552    .bind(part)
553    .bind(flow_uuid)
554    .bind(downstream_uuid)
555    .fetch_one(&mut *tx)
556    .await
557    .map_err(map_sqlx_error)?;
558
559    tx.commit().await.map_err(map_sqlx_error)?;
560
561    Ok(ApplyDependencyToChildResult::Applied {
562        unsatisfied_count: u32::try_from(unsatisfied.max(0)).unwrap_or(0),
563    })
564}