Skip to main content

ff_backend_postgres/
flow.rs

1//! Postgres flow-family `EngineBackend` implementations (Wave 4c).
2//!
3//! Six trait methods over the Wave 3 schema:
4//!
5//! * [`describe_flow`] — single-flow snapshot (row + edge groups).
6//! * [`list_flows`] — cursor-paginated per-partition listing.
7//! * [`list_edges`] — fan-in / fan-out adjacency for an execution.
8//! * [`describe_edge`] — one edge by flow + edge id.
9//! * [`cancel_flow`] — SERIALIZABLE cascade with a 3-attempt retry
10//!   loop. Exhaustion surfaces as
11//!   [`ContentionKind::RetryExhausted`] (Q11 — one of the three
12//!   SERIALIZABLE sites).
13//! * [`set_edge_group_policy`] — RFC-016 Stage A policy write;
14//!   rejects when edges have already been staged for the downstream
15//!   group (parity with the Valkey `invalid_input` error path).
16//!
17//! Field layout note: `ff_flow_core` only hoists the columns the
18//! engine filters on (public_flow_state, graph_revision, created_at).
19//! Every other `FlowSnapshot` field — `flow_kind`, `namespace`,
20//! `node_count`, `edge_count`, `last_mutation_at_ms`, cancel meta,
21//! and namespaced tags — rides in `raw_fields jsonb`. Same story
22//! for `ff_edge`: the immutable edge record (dependency_kind,
23//! satisfaction_condition, etc.) lives in the `policy jsonb` column
24//! (the only jsonb slot on the edge row); the typed columns carry
25//! only endpoints + policy_version.
26
27use std::collections::BTreeMap;
28use std::time::Duration;
29
30use ff_core::backend::CancelFlowPolicy;
31use ff_core::contracts::{
32    CancelFlowResult, EdgeDependencyPolicy, EdgeDirection, EdgeGroupSnapshot, EdgeGroupState,
33    EdgeSnapshot, FlowSnapshot, FlowStatus, FlowSummary, ListFlowsPage, OnSatisfied,
34    SetEdgeGroupPolicyResult,
35};
36use ff_core::engine_error::{
37    ContentionKind, EngineError, ValidationKind,
38};
39use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
40use ff_core::types::{EdgeId, ExecutionId, FlowId, Namespace, TimestampMs};
41use serde_json::Value as JsonValue;
42use sqlx::postgres::PgRow;
43use sqlx::{PgPool, Row};
44use uuid::Uuid;
45
46use crate::error::map_sqlx_error;
47
48/// Max attempts for the SERIALIZABLE retry loop on `cancel_flow`
49/// (Q11). After this many 40001 / 40P01 faults the caller sees
50/// [`ContentionKind::RetryExhausted`] and falls back to the
51/// reconciler.
52const CANCEL_FLOW_MAX_ATTEMPTS: u32 = 3;
53
54/// Extract a partition-index byte (0..=255) from a [`PartitionKey`].
55/// Returns a typed validation error on malformed keys.
56fn partition_index_from_key(key: &PartitionKey) -> Result<i16, EngineError> {
57    let p = key.parse().map_err(|e| EngineError::Validation {
58        kind: ValidationKind::InvalidInput,
59        detail: format!("partition_key: {e}"),
60    })?;
61    Ok(p.index as i16)
62}
63
64/// Compute the partition byte (0..=255) for a `FlowId` using the
65/// deployment's partition config. Routing parity with the Valkey
66/// backend's `flow_partition` — same crc16-CCITT modulo.
67fn flow_partition_byte(
68    flow_id: &FlowId,
69    partition_config: &ff_core::partition::PartitionConfig,
70) -> i16 {
71    ff_core::partition::flow_partition(flow_id, partition_config).index as i16
72}
73
74/// Decode a `raw_fields jsonb` string slot, falling back to `None`
75/// for missing / null / non-string values.
76fn raw_str<'a>(raw: &'a JsonValue, key: &str) -> Option<&'a str> {
77    raw.get(key).and_then(|v| v.as_str())
78}
79
80/// Decode a numeric slot from `raw_fields jsonb` as `u64`. Accepts
81/// JSON numbers or string-encoded ints (the Lua projector emits
82/// numbers; the Wave-4 proc emits the same).
83fn raw_u64(raw: &JsonValue, key: &str) -> Option<u64> {
84    raw.get(key).and_then(|v| {
85        v.as_u64()
86            .or_else(|| v.as_str().and_then(|s| s.parse::<u64>().ok()))
87    })
88}
89
90fn raw_u32(raw: &JsonValue, key: &str) -> Option<u32> {
91    raw.get(key).and_then(|v| {
92        v.as_u64()
93            .or_else(|| v.as_str().and_then(|s| s.parse::<u64>().ok()))
94            .and_then(|n| u32::try_from(n).ok())
95    })
96}
97
98/// Route `raw_fields` jsonb object keys into a `(typed_fields, tags)`
99/// pair — anything matching `^[a-z][a-z0-9_]*\.` is a namespaced tag.
100fn extract_tags(raw: &JsonValue) -> BTreeMap<String, String> {
101    let mut tags = BTreeMap::new();
102    let Some(obj) = raw.as_object() else {
103        return tags;
104    };
105    for (k, v) in obj {
106        if !is_namespaced_tag_key(k) {
107            continue;
108        }
109        if let Some(s) = v.as_str() {
110            tags.insert(k.clone(), s.to_owned());
111        }
112    }
113    tags
114}
115
116fn is_namespaced_tag_key(k: &str) -> bool {
117    let mut chars = k.chars();
118    let Some(first) = chars.next() else {
119        return false;
120    };
121    if !first.is_ascii_lowercase() {
122        return false;
123    }
124    let mut saw_dot = false;
125    for c in chars {
126        if c == '.' {
127            saw_dot = true;
128            break;
129        }
130        if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
131            return false;
132        }
133    }
134    saw_dot
135}
136
137fn parse_on_satisfied(s: &str) -> OnSatisfied {
138    match s {
139        "let_run" => OnSatisfied::LetRun,
140        _ => OnSatisfied::CancelRemaining,
141    }
142}
143
144/// Decode an `EdgeDependencyPolicy` from the `ff_edge_group.policy`
145/// jsonb. Layout mirrors the ff-core `Serialize` impl:
146/// `{ "kind": "all_of" | "any_of" | "quorum", "on_satisfied": ..., "k": .. }`.
147fn decode_edge_policy(v: &JsonValue) -> EdgeDependencyPolicy {
148    let kind = v.get("kind").and_then(|k| k.as_str()).unwrap_or("all_of");
149    match kind {
150        "any_of" => {
151            let on = v
152                .get("on_satisfied")
153                .and_then(|x| x.as_str())
154                .map(parse_on_satisfied)
155                .unwrap_or(OnSatisfied::CancelRemaining);
156            EdgeDependencyPolicy::AnyOf { on_satisfied: on }
157        }
158        "quorum" => {
159            let k = v
160                .get("k")
161                .and_then(|x| x.as_u64())
162                .and_then(|n| u32::try_from(n).ok())
163                .unwrap_or(1);
164            let on = v
165                .get("on_satisfied")
166                .and_then(|x| x.as_str())
167                .map(parse_on_satisfied)
168                .unwrap_or(OnSatisfied::CancelRemaining);
169            EdgeDependencyPolicy::Quorum { k, on_satisfied: on }
170        }
171        _ => EdgeDependencyPolicy::AllOf,
172    }
173}
174
175fn encode_edge_policy(p: &EdgeDependencyPolicy) -> JsonValue {
176    match p {
177        EdgeDependencyPolicy::AllOf => serde_json::json!({ "kind": "all_of" }),
178        EdgeDependencyPolicy::AnyOf { on_satisfied } => serde_json::json!({
179            "kind": "any_of",
180            "on_satisfied": on_satisfied.variant_str(),
181        }),
182        EdgeDependencyPolicy::Quorum { k, on_satisfied } => serde_json::json!({
183            "kind": "quorum",
184            "k": k,
185            "on_satisfied": on_satisfied.variant_str(),
186        }),
187        // Forward-compat: unknown variants fall back to the Stage-A
188        // `all_of` encoding so a future `#[non_exhaustive]` addition
189        // doesn't corrupt the persisted row.
190        _ => serde_json::json!({ "kind": "all_of" }),
191    }
192}
193
194/// Decode one `ff_edge_group` row into an [`EdgeGroupSnapshot`].
195fn decode_edge_group_row(row: &PgRow) -> Result<EdgeGroupSnapshot, EngineError> {
196    let downstream_uuid: Uuid = row.get("downstream_eid");
197    let policy_raw: JsonValue = row.get("policy");
198    let k_target: i32 = row.get("k_target");
199    let success_count: i32 = row.get("success_count");
200    let fail_count: i32 = row.get("fail_count");
201    let skip_count: i32 = row.get("skip_count");
202    let running_count: i32 = row.get("running_count");
203
204    // Round-trip: exec id lives in the flow partition. Reconstruct
205    // the full `{fp:N}:<uuid>` form using the row's partition_key.
206    let part: i16 = row.get("partition_key");
207    let downstream_id = ExecutionId::parse(&format!("{{fp:{part}}}:{downstream_uuid}"))
208        .map_err(|e| EngineError::Validation {
209            kind: ValidationKind::Corruption,
210            detail: format!("ff_edge_group.downstream_eid: {e}"),
211        })?;
212
213    let policy = decode_edge_policy(&policy_raw);
214    // Stage A: total_deps reported as success+fail+skip+running
215    // (equivalent to the RFC-007 `n` counter on the legacy hash).
216    let total: u32 = success_count.max(0) as u32
217        + fail_count.max(0) as u32
218        + skip_count.max(0) as u32
219        + running_count.max(0) as u32;
220
221    // group_state — Stage A derivation: satisfied when k_target met.
222    let state = if k_target > 0 && success_count >= k_target {
223        EdgeGroupState::Satisfied
224    } else {
225        EdgeGroupState::Pending
226    };
227
228    Ok(EdgeGroupSnapshot::new(
229        downstream_id,
230        policy,
231        total,
232        success_count.max(0) as u32,
233        fail_count.max(0) as u32,
234        skip_count.max(0) as u32,
235        running_count.max(0) as u32,
236        state,
237    ))
238}
239
240/// Decode one `ff_flow_core` row + its edge_group rows into a
241/// [`FlowSnapshot`].
242fn decode_flow_row(
243    flow_id: FlowId,
244    row: &PgRow,
245    edge_groups: Vec<EdgeGroupSnapshot>,
246) -> Result<FlowSnapshot, EngineError> {
247    let public_flow_state: String = row.get("public_flow_state");
248    let graph_revision_i: i64 = row.get("graph_revision");
249    let created_at_ms: i64 = row.get("created_at_ms");
250    let terminal_at_ms: Option<i64> = row.get("terminal_at_ms");
251    let raw_fields: JsonValue = row.get("raw_fields");
252
253    let flow_kind = raw_str(&raw_fields, "flow_kind")
254        .unwrap_or("")
255        .to_owned();
256    let namespace_str = raw_str(&raw_fields, "namespace").unwrap_or("default");
257    let namespace = Namespace::new(namespace_str.to_owned());
258    let node_count = raw_u32(&raw_fields, "node_count").unwrap_or(0);
259    let edge_count = raw_u32(&raw_fields, "edge_count").unwrap_or(0);
260    let last_mutation_at_ms =
261        raw_u64(&raw_fields, "last_mutation_at_ms").map(|n| n as i64).unwrap_or(created_at_ms);
262
263    let cancelled_at = terminal_at_ms.map(TimestampMs);
264    let cancel_reason = raw_str(&raw_fields, "cancel_reason").map(str::to_owned);
265    let cancellation_policy = raw_str(&raw_fields, "cancellation_policy").map(str::to_owned);
266
267    let tags = extract_tags(&raw_fields);
268
269    let graph_revision = u64::try_from(graph_revision_i).unwrap_or(0);
270
271    Ok(FlowSnapshot::new(
272        flow_id,
273        flow_kind,
274        namespace,
275        public_flow_state,
276        graph_revision,
277        node_count,
278        edge_count,
279        TimestampMs(created_at_ms),
280        TimestampMs(last_mutation_at_ms),
281        cancelled_at,
282        cancel_reason,
283        cancellation_policy,
284        tags,
285        edge_groups,
286    ))
287}
288
289/// Decode one `ff_edge` row into an [`EdgeSnapshot`].
290///
291/// The immutable edge record (dependency_kind, satisfaction_condition,
292/// data_passing_ref, edge_state, created_at, created_by) lives inside
293/// the `policy jsonb` column — the only jsonb slot on `ff_edge`.
294fn decode_edge_row(row: &PgRow, flow_id: &FlowId) -> Result<EdgeSnapshot, EngineError> {
295    let edge_uuid: Uuid = row.get("edge_id");
296    let upstream_uuid: Uuid = row.get("upstream_eid");
297    let downstream_uuid: Uuid = row.get("downstream_eid");
298    let part: i16 = row.get("partition_key");
299    let policy_raw: JsonValue = row.get("policy");
300
301    let upstream = ExecutionId::parse(&format!("{{fp:{part}}}:{upstream_uuid}"))
302        .map_err(|e| EngineError::Validation {
303            kind: ValidationKind::Corruption,
304            detail: format!("ff_edge.upstream_eid: {e}"),
305        })?;
306    let downstream = ExecutionId::parse(&format!("{{fp:{part}}}:{downstream_uuid}"))
307        .map_err(|e| EngineError::Validation {
308            kind: ValidationKind::Corruption,
309            detail: format!("ff_edge.downstream_eid: {e}"),
310        })?;
311
312    let dependency_kind = raw_str(&policy_raw, "dependency_kind")
313        .unwrap_or("success_only")
314        .to_owned();
315    let satisfaction_condition = raw_str(&policy_raw, "satisfaction_condition")
316        .unwrap_or("all_required")
317        .to_owned();
318    let data_passing_ref = raw_str(&policy_raw, "data_passing_ref")
319        .filter(|s| !s.is_empty())
320        .map(str::to_owned);
321    let edge_state = raw_str(&policy_raw, "edge_state")
322        .unwrap_or("pending")
323        .to_owned();
324    let created_at_ms =
325        raw_u64(&policy_raw, "created_at_ms").map(|n| n as i64).unwrap_or(0);
326    let created_by = raw_str(&policy_raw, "created_by")
327        .unwrap_or("engine")
328        .to_owned();
329
330    Ok(EdgeSnapshot::new(
331        EdgeId::from_uuid(edge_uuid),
332        flow_id.clone(),
333        upstream,
334        downstream,
335        dependency_kind,
336        satisfaction_condition,
337        data_passing_ref,
338        edge_state,
339        TimestampMs(created_at_ms),
340        created_by,
341    ))
342}
343
344/// `describe_flow` — single flow snapshot + edge_groups.
345pub async fn describe_flow(
346    pool: &PgPool,
347    partition_config: &ff_core::partition::PartitionConfig,
348    id: &FlowId,
349) -> Result<Option<FlowSnapshot>, EngineError> {
350    let part = flow_partition_byte(id, partition_config);
351    let flow_uuid: Uuid = id.0;
352
353    let flow_row_opt = sqlx::query(
354        "SELECT partition_key, flow_id, graph_revision, public_flow_state, \
355                created_at_ms, terminal_at_ms, raw_fields \
356         FROM ff_flow_core \
357         WHERE partition_key = $1 AND flow_id = $2",
358    )
359    .bind(part)
360    .bind(flow_uuid)
361    .fetch_optional(pool)
362    .await
363    .map_err(map_sqlx_error)?;
364
365    let Some(flow_row) = flow_row_opt else {
366        return Ok(None);
367    };
368
369    let eg_rows = sqlx::query(
370        "SELECT partition_key, flow_id, downstream_eid, policy, \
371                k_target, success_count, fail_count, skip_count, running_count \
372         FROM ff_edge_group \
373         WHERE partition_key = $1 AND flow_id = $2 \
374         ORDER BY downstream_eid",
375    )
376    .bind(part)
377    .bind(flow_uuid)
378    .fetch_all(pool)
379    .await
380    .map_err(map_sqlx_error)?;
381
382    let mut edge_groups = Vec::with_capacity(eg_rows.len());
383    for row in &eg_rows {
384        edge_groups.push(decode_edge_group_row(row)?);
385    }
386
387    decode_flow_row(id.clone(), &flow_row, edge_groups).map(Some)
388}
389
390/// `list_flows` — cursor-paginated per-partition flow listing.
391pub async fn list_flows(
392    pool: &PgPool,
393    partition: PartitionKey,
394    cursor: Option<FlowId>,
395    limit: usize,
396) -> Result<ListFlowsPage, EngineError> {
397    if limit == 0 {
398        return Ok(ListFlowsPage::new(Vec::new(), None));
399    }
400    let part = partition_index_from_key(&partition)?;
401    let cursor_uuid: Option<Uuid> = cursor.as_ref().map(|f| f.0);
402    // +1 row to decide whether next_cursor should be set.
403    let fetch_limit = (limit + 1) as i64;
404
405    let rows = sqlx::query(
406        "SELECT flow_id, created_at_ms, public_flow_state \
407         FROM ff_flow_core \
408         WHERE partition_key = $1 \
409           AND ($2::uuid IS NULL OR flow_id > $2) \
410         ORDER BY flow_id \
411         LIMIT $3",
412    )
413    .bind(part)
414    .bind(cursor_uuid)
415    .bind(fetch_limit)
416    .fetch_all(pool)
417    .await
418    .map_err(map_sqlx_error)?;
419
420    let mut flows: Vec<FlowSummary> = Vec::with_capacity(rows.len().min(limit));
421    let mut next_cursor: Option<FlowId> = None;
422    for (idx, row) in rows.iter().enumerate() {
423        if idx >= limit {
424            // The (limit+1)-th row signals there is at least one more;
425            // we don't include it in the page, but the last included
426            // row's flow_id becomes the cursor.
427            if let Some(last) = flows.last() {
428                next_cursor = Some(last.flow_id.clone());
429            }
430            break;
431        }
432        let flow_uuid: Uuid = row.get("flow_id");
433        let created_at_ms: i64 = row.get("created_at_ms");
434        let public_state: String = row.get("public_flow_state");
435        let status = FlowStatus::from_public_flow_state(&public_state);
436        flows.push(FlowSummary::new(
437            FlowId::from_uuid(flow_uuid),
438            TimestampMs(created_at_ms),
439            status,
440        ));
441    }
442
443    Ok(ListFlowsPage::new(flows, next_cursor))
444}
445
446/// `list_edges` — direction-keyed adjacency listing.
447pub async fn list_edges(
448    pool: &PgPool,
449    partition_config: &ff_core::partition::PartitionConfig,
450    flow_id: &FlowId,
451    direction: EdgeDirection,
452) -> Result<Vec<EdgeSnapshot>, EngineError> {
453    let part = flow_partition_byte(flow_id, partition_config);
454    let flow_uuid: Uuid = flow_id.0;
455    let (column_filter, subject_eid) = match &direction {
456        EdgeDirection::Outgoing { from_node } => ("upstream_eid", from_node),
457        EdgeDirection::Incoming { to_node } => ("downstream_eid", to_node),
458    };
459    // Parse the UUID suffix from the `{fp:N}:<uuid>` id.
460    let subject_uuid = parse_exec_uuid(subject_eid)?;
461
462    let sql = format!(
463        "SELECT partition_key, flow_id, edge_id, upstream_eid, downstream_eid, policy \
464         FROM ff_edge \
465         WHERE partition_key = $1 AND flow_id = $2 AND {column_filter} = $3 \
466         ORDER BY edge_id"
467    );
468    let rows = sqlx::query(&sql)
469        .bind(part)
470        .bind(flow_uuid)
471        .bind(subject_uuid)
472        .fetch_all(pool)
473        .await
474        .map_err(map_sqlx_error)?;
475
476    let mut out = Vec::with_capacity(rows.len());
477    for row in &rows {
478        out.push(decode_edge_row(row, flow_id)?);
479    }
480    Ok(out)
481}
482
483/// `describe_edge` — one edge by flow + edge id.
484pub async fn describe_edge(
485    pool: &PgPool,
486    partition_config: &ff_core::partition::PartitionConfig,
487    flow_id: &FlowId,
488    edge_id: &EdgeId,
489) -> Result<Option<EdgeSnapshot>, EngineError> {
490    let part = flow_partition_byte(flow_id, partition_config);
491    let row_opt = sqlx::query(
492        "SELECT partition_key, flow_id, edge_id, upstream_eid, downstream_eid, policy \
493         FROM ff_edge \
494         WHERE partition_key = $1 AND flow_id = $2 AND edge_id = $3",
495    )
496    .bind(part)
497    .bind(flow_id.0)
498    .bind(edge_id.0)
499    .fetch_optional(pool)
500    .await
501    .map_err(map_sqlx_error)?;
502
503    match row_opt {
504        Some(row) => decode_edge_row(&row, flow_id).map(Some),
505        None => Ok(None),
506    }
507}
508
509/// Parse the UUID suffix out of a hash-tagged `{fp:N}:<uuid>` exec id.
510fn parse_exec_uuid(eid: &ExecutionId) -> Result<Uuid, EngineError> {
511    let s = eid.as_str();
512    let Some(colon) = s.rfind("}:") else {
513        return Err(EngineError::Validation {
514            kind: ValidationKind::InvalidInput,
515            detail: format!("execution_id missing '}}:' delimiter: {s}"),
516        });
517    };
518    let tail = &s[colon + 2..];
519    Uuid::parse_str(tail).map_err(|e| EngineError::Validation {
520        kind: ValidationKind::InvalidInput,
521        detail: format!("execution_id uuid suffix parse: {e}"),
522    })
523}
524
525fn cancel_policy_to_str(p: CancelFlowPolicy) -> &'static str {
526    match p {
527        CancelFlowPolicy::FlowOnly => "cancel_flow_only",
528        CancelFlowPolicy::CancelAll => "cancel_all",
529        CancelFlowPolicy::CancelPending => "cancel_pending",
530        // Forward-compat for future additive `CancelFlowPolicy` variants
531        // — encode as `cancel_all` (safest conservative default).
532        _ => "cancel_all",
533    }
534}
535
536/// `cancel_flow` — SERIALIZABLE cascade with a 3-attempt retry loop.
537///
538/// Transaction steps:
539///   1. SELECT + UPDATE `ff_flow_core.public_flow_state = 'cancelled'`
540///      (atomic state flip).
541///   2. For `CancelAll`/`CancelPending`: SELECT member execution ids
542///      from `ff_exec_core` filtered by flow_id and state, UPDATE
543///      each to `cancelled`, INSERT a `ff_completion_event` row per
544///      member (the trigger fires NOTIFY at COMMIT).
545///   3. For `CancelPending` under `CancelRemaining` semantics (Stage
546///      C), INSERT a `ff_pending_cancel_groups` bookkeeping row for
547///      every running sibling group that the Wave-5 dispatcher needs
548///      to follow up on. Wave 4c only writes the bookkeeping row —
549///      the actual dispatch is Wave 5.
550///
551/// On `SQLSTATE 40001` / `40P01` (serialization / deadlock) the
552/// transaction ROLLBACKs, we sleep a tiny backoff, and retry. After
553/// [`CANCEL_FLOW_MAX_ATTEMPTS`] failed attempts we return
554/// [`ContentionKind::RetryExhausted`].
555pub async fn cancel_flow(
556    pool: &PgPool,
557    partition_config: &ff_core::partition::PartitionConfig,
558    id: &FlowId,
559    policy: CancelFlowPolicy,
560) -> Result<CancelFlowResult, EngineError> {
561    let part = flow_partition_byte(id, partition_config);
562    let flow_uuid: Uuid = id.0;
563    let policy_str = cancel_policy_to_str(policy);
564
565    let mut last_transport: Option<EngineError> = None;
566    for attempt in 0..CANCEL_FLOW_MAX_ATTEMPTS {
567        match cancel_flow_once(pool, part, flow_uuid, policy, policy_str).await {
568            Ok(result) => return Ok(result),
569            Err(err) => {
570                if is_serialization_conflict(&err) {
571                    // Tiny exponential backoff: 0, 5ms, 15ms.
572                    if attempt + 1 < CANCEL_FLOW_MAX_ATTEMPTS {
573                        let ms = 5u64 * (1u64 << attempt).saturating_sub(0);
574                        tokio::time::sleep(Duration::from_millis(ms)).await;
575                    }
576                    last_transport = Some(err);
577                    continue;
578                }
579                return Err(err);
580            }
581        }
582    }
583    // Exhausted all attempts on serialization failures.
584    let _ = last_transport; // drop; retained only for trace-level diagnosis
585    Err(EngineError::Contention(ContentionKind::RetryExhausted))
586}
587
588/// Return `true` when an `EngineError` was produced by the 40001 /
589/// 40P01 sqlx mapping (Contention(LeaseConflict)). We re-map that
590/// specific kind into the retry loop; other `Contention` variants
591/// (e.g. honest lease contention from a future extension) propagate
592/// untouched.
593fn is_serialization_conflict(err: &EngineError) -> bool {
594    matches!(
595        err,
596        EngineError::Contention(ContentionKind::LeaseConflict)
597    )
598}
599
600async fn cancel_flow_once(
601    pool: &PgPool,
602    part: i16,
603    flow_uuid: Uuid,
604    policy: CancelFlowPolicy,
605    policy_str: &str,
606) -> Result<CancelFlowResult, EngineError> {
607    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
608
609    sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
610        .execute(&mut *tx)
611        .await
612        .map_err(map_sqlx_error)?;
613
614    // Step 1 — atomically flip the flow_core state. If the flow
615    // doesn't exist we return `Cancelled { members: [] }` with empty
616    // members (idempotent retry after the row has already been
617    // cancelled returns the same shape).
618    let flow_found = sqlx::query(
619        "UPDATE ff_flow_core \
620         SET public_flow_state = 'cancelled', \
621             terminal_at_ms = COALESCE(terminal_at_ms, \
622                 (extract(epoch from clock_timestamp())*1000)::bigint), \
623             raw_fields = raw_fields \
624                 || jsonb_build_object('cancellation_policy', $3::text) \
625         WHERE partition_key = $1 AND flow_id = $2 \
626         RETURNING flow_id",
627    )
628    .bind(part)
629    .bind(flow_uuid)
630    .bind(policy_str)
631    .fetch_optional(&mut *tx)
632    .await
633    .map_err(map_sqlx_error)?;
634
635    if flow_found.is_none() {
636        tx.commit().await.map_err(map_sqlx_error)?;
637        return Ok(CancelFlowResult::Cancelled {
638            cancellation_policy: policy_str.to_owned(),
639            member_execution_ids: Vec::new(),
640        });
641    }
642
643    // Step 2 — collect + cancel member executions under the policy.
644    let member_rows = if matches!(policy, CancelFlowPolicy::FlowOnly) {
645        Vec::new()
646    } else {
647        let state_filter: &str = match policy {
648            CancelFlowPolicy::CancelAll => {
649                "lifecycle_phase NOT IN ('completed','failed','cancelled','expired')"
650            }
651            CancelFlowPolicy::CancelPending => {
652                "lifecycle_phase IN ('pending','blocked','eligible')"
653            }
654            _ => "lifecycle_phase NOT IN ('completed','failed','cancelled','expired')",
655        };
656        let sql = format!(
657            "SELECT execution_id FROM ff_exec_core \
658             WHERE partition_key = $1 AND flow_id = $2 AND {state_filter}"
659        );
660        sqlx::query(&sql)
661            .bind(part)
662            .bind(flow_uuid)
663            .fetch_all(&mut *tx)
664            .await
665            .map_err(map_sqlx_error)?
666    };
667
668    let mut member_execution_ids: Vec<String> = Vec::with_capacity(member_rows.len());
669    for row in &member_rows {
670        let exec_uuid: Uuid = row.get("execution_id");
671        // Flip the member to `cancelled` lifecycle + eligibility.
672        sqlx::query(
673            "UPDATE ff_exec_core \
674             SET lifecycle_phase = 'cancelled', \
675                 eligibility_state = 'cancelled', \
676                 public_state = 'cancelled', \
677                 terminal_at_ms = COALESCE(terminal_at_ms, \
678                     (extract(epoch from clock_timestamp())*1000)::bigint), \
679                 cancellation_reason = COALESCE(cancellation_reason, 'flow_cancelled'), \
680                 cancelled_by = COALESCE(cancelled_by, 'cancel_flow') \
681             WHERE partition_key = $1 AND execution_id = $2",
682        )
683        .bind(part)
684        .bind(exec_uuid)
685        .execute(&mut *tx)
686        .await
687        .map_err(map_sqlx_error)?;
688
689        // Emit a completion outbox row (trigger fires NOTIFY).
690        sqlx::query(
691            "INSERT INTO ff_completion_event \
692             (partition_key, execution_id, flow_id, outcome, occurred_at_ms) \
693             VALUES ($1, $2, $3, 'cancelled', \
694                     (extract(epoch from clock_timestamp())*1000)::bigint)",
695        )
696        .bind(part)
697        .bind(exec_uuid)
698        .bind(flow_uuid)
699        .execute(&mut *tx)
700        .await
701        .map_err(map_sqlx_error)?;
702
703        member_execution_ids.push(format!("{{fp:{part}}}:{exec_uuid}"));
704    }
705
706    // Step 3 — for CancelPending, record any running sibling groups
707    // into ff_pending_cancel_groups so the Wave-5 dispatcher can
708    // follow up. Wave 4c only does the bookkeeping write.
709    if matches!(policy, CancelFlowPolicy::CancelPending) {
710        sqlx::query(
711            "INSERT INTO ff_pending_cancel_groups \
712             (partition_key, flow_id, downstream_eid, enqueued_at_ms) \
713             SELECT partition_key, flow_id, downstream_eid, \
714                    (extract(epoch from clock_timestamp())*1000)::bigint \
715             FROM ff_edge_group \
716             WHERE partition_key = $1 AND flow_id = $2 AND running_count > 0 \
717             ON CONFLICT DO NOTHING",
718        )
719        .bind(part)
720        .bind(flow_uuid)
721        .execute(&mut *tx)
722        .await
723        .map_err(map_sqlx_error)?;
724    }
725
726    tx.commit().await.map_err(map_sqlx_error)?;
727
728    Ok(CancelFlowResult::Cancelled {
729        cancellation_policy: policy_str.to_owned(),
730        member_execution_ids,
731    })
732}
733
734/// `set_edge_group_policy` — RFC-016 Stage A.
735///
736/// Reject when at least one edge has already been staged for
737/// `downstream_execution_id` (the ordering rule: policy must be set
738/// BEFORE the first `add_dependency`). Parity with the Valkey Lua
739/// path, which returns `invalid_input` → `EngineError::Validation`
740/// in the same situation (not `Conflict`, per in-tree behavior).
741///
742/// Stage A supports `AllOf` fully; `AnyOf` / `Quorum` flow through
743/// the same write path (Stage B's resolver is backend-specific —
744/// Valkey's Lua honours them, Postgres Wave 4c stores them and Wave
745/// 5 wires the resolver). Callers that need the Stage-A ordering
746/// check can rely on this method regardless of the variant.
747pub async fn set_edge_group_policy(
748    pool: &PgPool,
749    partition_config: &ff_core::partition::PartitionConfig,
750    flow_id: &FlowId,
751    downstream_execution_id: &ExecutionId,
752    policy: EdgeDependencyPolicy,
753) -> Result<SetEdgeGroupPolicyResult, EngineError> {
754    // Light input validation mirroring the Valkey Stage B impl.
755    match &policy {
756        EdgeDependencyPolicy::AllOf => {}
757        EdgeDependencyPolicy::AnyOf { .. } => {}
758        EdgeDependencyPolicy::Quorum { k, .. } => {
759            if *k == 0 {
760                return Err(EngineError::Validation {
761                    kind: ValidationKind::InvalidInput,
762                    detail: "quorum k must be >= 1".to_string(),
763                });
764            }
765        }
766        _ => {
767            return Err(EngineError::Validation {
768                kind: ValidationKind::InvalidInput,
769                detail: "unknown EdgeDependencyPolicy variant".to_string(),
770            });
771        }
772    }
773
774    let part = flow_partition_byte(flow_id, partition_config);
775    let flow_uuid: Uuid = flow_id.0;
776    let downstream_uuid = parse_exec_uuid(downstream_execution_id)?;
777
778    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
779
780    // Ordering guard: edges already staged for this group?
781    let already_staged: i64 = sqlx::query_scalar(
782        "SELECT COUNT(*) FROM ff_edge \
783         WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
784    )
785    .bind(part)
786    .bind(flow_uuid)
787    .bind(downstream_uuid)
788    .fetch_one(&mut *tx)
789    .await
790    .map_err(map_sqlx_error)?;
791
792    if already_staged > 0 {
793        let _ = tx.rollback().await;
794        return Err(EngineError::Validation {
795            kind: ValidationKind::InvalidInput,
796            detail: "edge_group_policy_already_fixed: dependencies already staged".to_string(),
797        });
798    }
799
800    // Idempotent restate: if an identical policy is already stored,
801    // return AlreadySet without touching the row.
802    let existing: Option<JsonValue> = sqlx::query_scalar(
803        "SELECT policy FROM ff_edge_group \
804         WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
805    )
806    .bind(part)
807    .bind(flow_uuid)
808    .bind(downstream_uuid)
809    .fetch_optional(&mut *tx)
810    .await
811    .map_err(map_sqlx_error)?;
812
813    let encoded = encode_edge_policy(&policy);
814    if let Some(existing_policy) = existing
815        && existing_policy == encoded
816    {
817        tx.commit().await.map_err(map_sqlx_error)?;
818        return Ok(SetEdgeGroupPolicyResult::AlreadySet);
819    }
820
821    sqlx::query(
822        "INSERT INTO ff_edge_group \
823         (partition_key, flow_id, downstream_eid, policy) \
824         VALUES ($1, $2, $3, $4) \
825         ON CONFLICT (partition_key, flow_id, downstream_eid) \
826         DO UPDATE SET policy = EXCLUDED.policy",
827    )
828    .bind(part)
829    .bind(flow_uuid)
830    .bind(downstream_uuid)
831    .bind(&encoded)
832    .execute(&mut *tx)
833    .await
834    .map_err(map_sqlx_error)?;
835
836    tx.commit().await.map_err(map_sqlx_error)?;
837    Ok(SetEdgeGroupPolicyResult::Set)
838}
839
840// Silence an unused-import lint if PartitionFamily/Partition imports
841// aren't otherwise referenced after trimming.
842#[allow(dead_code)]
843fn _unused_imports_anchor(_p: Partition, _f: PartitionFamily) {}