Skip to main content

ff_backend_postgres/
flow.rs

1//! Postgres flow-family `EngineBackend` implementations (Wave 4c).
2//!
3//! Six trait methods over the Wave 3 schema:
4//!
5//! * [`describe_flow`] — single-flow snapshot (row + edge groups).
6//! * [`list_flows`] — cursor-paginated per-partition listing.
7//! * [`list_edges`] — fan-in / fan-out adjacency for an execution.
8//! * [`describe_edge`] — one edge by flow + edge id.
9//! * [`cancel_flow`] — SERIALIZABLE cascade with a 3-attempt retry
10//!   loop. Exhaustion surfaces as
11//!   [`ContentionKind::RetryExhausted`] (Q11 — one of the three
12//!   SERIALIZABLE sites).
13//! * [`set_edge_group_policy`] — RFC-016 Stage A policy write;
14//!   rejects when edges have already been staged for the downstream
15//!   group (parity with the Valkey `invalid_input` error path).
16//!
17//! Field layout note: `ff_flow_core` only hoists the columns the
18//! engine filters on (public_flow_state, graph_revision, created_at).
19//! Every other `FlowSnapshot` field — `flow_kind`, `namespace`,
20//! `node_count`, `edge_count`, `last_mutation_at_ms`, cancel meta,
21//! and namespaced tags — rides in `raw_fields jsonb`. Same story
22//! for `ff_edge`: the immutable edge record (dependency_kind,
23//! satisfaction_condition, etc.) lives in the `policy jsonb` column
24//! (the only jsonb slot on the edge row); the typed columns carry
25//! only endpoints + policy_version.
26
27use std::collections::BTreeMap;
28use std::time::Duration;
29
30use ff_core::backend::CancelFlowPolicy;
31use ff_core::contracts::{
32    CancelFlowResult, EdgeDependencyPolicy, EdgeDirection, EdgeGroupSnapshot, EdgeGroupState,
33    EdgeSnapshot, FlowSnapshot, FlowStatus, FlowSummary, ListFlowsPage, OnSatisfied,
34    SetEdgeGroupPolicyResult,
35};
36use ff_core::engine_error::{
37    ContentionKind, EngineError, ValidationKind,
38};
39use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
40use ff_core::types::{EdgeId, ExecutionId, FlowId, Namespace, TimestampMs};
41use serde_json::Value as JsonValue;
42use sqlx::postgres::PgRow;
43use sqlx::{PgPool, Row};
44use uuid::Uuid;
45
46use crate::error::map_sqlx_error;
47
48/// Max attempts for the SERIALIZABLE retry loop on `cancel_flow`
49/// (Q11). After this many 40001 / 40P01 faults the caller sees
50/// [`ContentionKind::RetryExhausted`] and falls back to the
51/// reconciler.
52const CANCEL_FLOW_MAX_ATTEMPTS: u32 = 3;
53
54/// Extract a partition-index byte (0..=255) from a [`PartitionKey`].
55/// Returns a typed validation error on malformed keys.
56fn partition_index_from_key(key: &PartitionKey) -> Result<i16, EngineError> {
57    let p = key.parse().map_err(|e| EngineError::Validation {
58        kind: ValidationKind::InvalidInput,
59        detail: format!("partition_key: {e}"),
60    })?;
61    Ok(p.index as i16)
62}
63
64/// Compute the partition byte (0..=255) for a `FlowId` using the
65/// deployment's partition config. Routing parity with the Valkey
66/// backend's `flow_partition` — same crc16-CCITT modulo.
67fn flow_partition_byte(
68    flow_id: &FlowId,
69    partition_config: &ff_core::partition::PartitionConfig,
70) -> i16 {
71    ff_core::partition::flow_partition(flow_id, partition_config).index as i16
72}
73
74/// Decode a `raw_fields jsonb` string slot, falling back to `None`
75/// for missing / null / non-string values.
76fn raw_str<'a>(raw: &'a JsonValue, key: &str) -> Option<&'a str> {
77    raw.get(key).and_then(|v| v.as_str())
78}
79
80/// Decode a numeric slot from `raw_fields jsonb` as `u64`. Accepts
81/// JSON numbers or string-encoded ints (the Lua projector emits
82/// numbers; the Wave-4 proc emits the same).
83fn raw_u64(raw: &JsonValue, key: &str) -> Option<u64> {
84    raw.get(key).and_then(|v| {
85        v.as_u64()
86            .or_else(|| v.as_str().and_then(|s| s.parse::<u64>().ok()))
87    })
88}
89
90fn raw_u32(raw: &JsonValue, key: &str) -> Option<u32> {
91    raw.get(key).and_then(|v| {
92        v.as_u64()
93            .or_else(|| v.as_str().and_then(|s| s.parse::<u64>().ok()))
94            .and_then(|n| u32::try_from(n).ok())
95    })
96}
97
98/// Route `raw_fields` jsonb object keys into a `(typed_fields, tags)`
99/// pair — anything matching `^[a-z][a-z0-9_]*\.` is a namespaced tag.
100fn extract_tags(raw: &JsonValue) -> BTreeMap<String, String> {
101    let mut tags = BTreeMap::new();
102    let Some(obj) = raw.as_object() else {
103        return tags;
104    };
105    for (k, v) in obj {
106        if !is_namespaced_tag_key(k) {
107            continue;
108        }
109        if let Some(s) = v.as_str() {
110            tags.insert(k.clone(), s.to_owned());
111        }
112    }
113    tags
114}
115
116fn is_namespaced_tag_key(k: &str) -> bool {
117    let mut chars = k.chars();
118    let Some(first) = chars.next() else {
119        return false;
120    };
121    if !first.is_ascii_lowercase() {
122        return false;
123    }
124    let mut saw_dot = false;
125    for c in chars {
126        if c == '.' {
127            saw_dot = true;
128            break;
129        }
130        if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
131            return false;
132        }
133    }
134    saw_dot
135}
136
137fn parse_on_satisfied(s: &str) -> OnSatisfied {
138    match s {
139        "let_run" => OnSatisfied::LetRun,
140        _ => OnSatisfied::CancelRemaining,
141    }
142}
143
144/// Decode an `EdgeDependencyPolicy` from the `ff_edge_group.policy`
145/// jsonb. Layout mirrors the ff-core `Serialize` impl:
146/// `{ "kind": "all_of" | "any_of" | "quorum", "on_satisfied": ..., "k": .. }`.
147fn decode_edge_policy(v: &JsonValue) -> EdgeDependencyPolicy {
148    let kind = v.get("kind").and_then(|k| k.as_str()).unwrap_or("all_of");
149    match kind {
150        "any_of" => {
151            let on = v
152                .get("on_satisfied")
153                .and_then(|x| x.as_str())
154                .map(parse_on_satisfied)
155                .unwrap_or(OnSatisfied::CancelRemaining);
156            EdgeDependencyPolicy::AnyOf { on_satisfied: on }
157        }
158        "quorum" => {
159            let k = v
160                .get("k")
161                .and_then(|x| x.as_u64())
162                .and_then(|n| u32::try_from(n).ok())
163                .unwrap_or(1);
164            let on = v
165                .get("on_satisfied")
166                .and_then(|x| x.as_str())
167                .map(parse_on_satisfied)
168                .unwrap_or(OnSatisfied::CancelRemaining);
169            EdgeDependencyPolicy::Quorum { k, on_satisfied: on }
170        }
171        _ => EdgeDependencyPolicy::AllOf,
172    }
173}
174
175fn encode_edge_policy(p: &EdgeDependencyPolicy) -> JsonValue {
176    match p {
177        EdgeDependencyPolicy::AllOf => serde_json::json!({ "kind": "all_of" }),
178        EdgeDependencyPolicy::AnyOf { on_satisfied } => serde_json::json!({
179            "kind": "any_of",
180            "on_satisfied": on_satisfied.variant_str(),
181        }),
182        EdgeDependencyPolicy::Quorum { k, on_satisfied } => serde_json::json!({
183            "kind": "quorum",
184            "k": k,
185            "on_satisfied": on_satisfied.variant_str(),
186        }),
187        // Forward-compat: unknown variants fall back to the Stage-A
188        // `all_of` encoding so a future `#[non_exhaustive]` addition
189        // doesn't corrupt the persisted row.
190        _ => serde_json::json!({ "kind": "all_of" }),
191    }
192}
193
194/// Decode one `ff_edge_group` row into an [`EdgeGroupSnapshot`].
195fn decode_edge_group_row(row: &PgRow) -> Result<EdgeGroupSnapshot, EngineError> {
196    let downstream_uuid: Uuid = row.get("downstream_eid");
197    let policy_raw: JsonValue = row.get("policy");
198    let k_target: i32 = row.get("k_target");
199    let success_count: i32 = row.get("success_count");
200    let fail_count: i32 = row.get("fail_count");
201    let skip_count: i32 = row.get("skip_count");
202    let running_count: i32 = row.get("running_count");
203
204    // Round-trip: exec id lives in the flow partition. Reconstruct
205    // the full `{fp:N}:<uuid>` form using the row's partition_key.
206    let part: i16 = row.get("partition_key");
207    let downstream_id = ExecutionId::parse(&format!("{{fp:{part}}}:{downstream_uuid}"))
208        .map_err(|e| EngineError::Validation {
209            kind: ValidationKind::Corruption,
210            detail: format!("ff_edge_group.downstream_eid: {e}"),
211        })?;
212
213    let policy = decode_edge_policy(&policy_raw);
214    // Stage A: total_deps reported as success+fail+skip+running
215    // (equivalent to the RFC-007 `n` counter on the legacy hash).
216    let total: u32 = success_count.max(0) as u32
217        + fail_count.max(0) as u32
218        + skip_count.max(0) as u32
219        + running_count.max(0) as u32;
220
221    // group_state — Stage A derivation: satisfied when k_target met.
222    let state = if k_target > 0 && success_count >= k_target {
223        EdgeGroupState::Satisfied
224    } else {
225        EdgeGroupState::Pending
226    };
227
228    Ok(EdgeGroupSnapshot::new(
229        downstream_id,
230        policy,
231        total,
232        success_count.max(0) as u32,
233        fail_count.max(0) as u32,
234        skip_count.max(0) as u32,
235        running_count.max(0) as u32,
236        state,
237    ))
238}
239
240/// Decode one `ff_flow_core` row + its edge_group rows into a
241/// [`FlowSnapshot`].
242fn decode_flow_row(
243    flow_id: FlowId,
244    row: &PgRow,
245    edge_groups: Vec<EdgeGroupSnapshot>,
246) -> Result<FlowSnapshot, EngineError> {
247    let public_flow_state: String = row.get("public_flow_state");
248    let graph_revision_i: i64 = row.get("graph_revision");
249    let created_at_ms: i64 = row.get("created_at_ms");
250    let terminal_at_ms: Option<i64> = row.get("terminal_at_ms");
251    let raw_fields: JsonValue = row.get("raw_fields");
252
253    let flow_kind = raw_str(&raw_fields, "flow_kind")
254        .unwrap_or("")
255        .to_owned();
256    let namespace_str = raw_str(&raw_fields, "namespace").unwrap_or("default");
257    let namespace = Namespace::new(namespace_str.to_owned());
258    let node_count = raw_u32(&raw_fields, "node_count").unwrap_or(0);
259    let edge_count = raw_u32(&raw_fields, "edge_count").unwrap_or(0);
260    let last_mutation_at_ms =
261        raw_u64(&raw_fields, "last_mutation_at_ms").map(|n| n as i64).unwrap_or(created_at_ms);
262
263    let cancelled_at = terminal_at_ms.map(TimestampMs);
264    let cancel_reason = raw_str(&raw_fields, "cancel_reason").map(str::to_owned);
265    let cancellation_policy = raw_str(&raw_fields, "cancellation_policy").map(str::to_owned);
266
267    let tags = extract_tags(&raw_fields);
268
269    let graph_revision = u64::try_from(graph_revision_i).unwrap_or(0);
270
271    Ok(FlowSnapshot::new(
272        flow_id,
273        flow_kind,
274        namespace,
275        public_flow_state,
276        graph_revision,
277        node_count,
278        edge_count,
279        TimestampMs(created_at_ms),
280        TimestampMs(last_mutation_at_ms),
281        cancelled_at,
282        cancel_reason,
283        cancellation_policy,
284        tags,
285        edge_groups,
286    ))
287}
288
289/// Decode one `ff_edge` row into an [`EdgeSnapshot`].
290///
291/// The immutable edge record (dependency_kind, satisfaction_condition,
292/// data_passing_ref, edge_state, created_at, created_by) lives inside
293/// the `policy jsonb` column — the only jsonb slot on `ff_edge`.
294fn decode_edge_row(row: &PgRow, flow_id: &FlowId) -> Result<EdgeSnapshot, EngineError> {
295    let edge_uuid: Uuid = row.get("edge_id");
296    let upstream_uuid: Uuid = row.get("upstream_eid");
297    let downstream_uuid: Uuid = row.get("downstream_eid");
298    let part: i16 = row.get("partition_key");
299    let policy_raw: JsonValue = row.get("policy");
300
301    let upstream = ExecutionId::parse(&format!("{{fp:{part}}}:{upstream_uuid}"))
302        .map_err(|e| EngineError::Validation {
303            kind: ValidationKind::Corruption,
304            detail: format!("ff_edge.upstream_eid: {e}"),
305        })?;
306    let downstream = ExecutionId::parse(&format!("{{fp:{part}}}:{downstream_uuid}"))
307        .map_err(|e| EngineError::Validation {
308            kind: ValidationKind::Corruption,
309            detail: format!("ff_edge.downstream_eid: {e}"),
310        })?;
311
312    let dependency_kind = raw_str(&policy_raw, "dependency_kind")
313        .unwrap_or("success_only")
314        .to_owned();
315    let satisfaction_condition = raw_str(&policy_raw, "satisfaction_condition")
316        .unwrap_or("all_required")
317        .to_owned();
318    let data_passing_ref = raw_str(&policy_raw, "data_passing_ref")
319        .filter(|s| !s.is_empty())
320        .map(str::to_owned);
321    let edge_state = raw_str(&policy_raw, "edge_state")
322        .unwrap_or("pending")
323        .to_owned();
324    let created_at_ms =
325        raw_u64(&policy_raw, "created_at_ms").map(|n| n as i64).unwrap_or(0);
326    let created_by = raw_str(&policy_raw, "created_by")
327        .unwrap_or("engine")
328        .to_owned();
329
330    Ok(EdgeSnapshot::new(
331        EdgeId::from_uuid(edge_uuid),
332        flow_id.clone(),
333        upstream,
334        downstream,
335        dependency_kind,
336        satisfaction_condition,
337        data_passing_ref,
338        edge_state,
339        TimestampMs(created_at_ms),
340        created_by,
341    ))
342}
343
344/// `describe_flow` — single flow snapshot + edge_groups.
345pub async fn describe_flow(
346    pool: &PgPool,
347    partition_config: &ff_core::partition::PartitionConfig,
348    id: &FlowId,
349) -> Result<Option<FlowSnapshot>, EngineError> {
350    let part = flow_partition_byte(id, partition_config);
351    let flow_uuid: Uuid = id.0;
352
353    let flow_row_opt = sqlx::query(
354        "SELECT partition_key, flow_id, graph_revision, public_flow_state, \
355                created_at_ms, terminal_at_ms, raw_fields \
356         FROM ff_flow_core \
357         WHERE partition_key = $1 AND flow_id = $2",
358    )
359    .bind(part)
360    .bind(flow_uuid)
361    .fetch_optional(pool)
362    .await
363    .map_err(map_sqlx_error)?;
364
365    let Some(flow_row) = flow_row_opt else {
366        return Ok(None);
367    };
368
369    let eg_rows = sqlx::query(
370        "SELECT partition_key, flow_id, downstream_eid, policy, \
371                k_target, success_count, fail_count, skip_count, running_count \
372         FROM ff_edge_group \
373         WHERE partition_key = $1 AND flow_id = $2 \
374         ORDER BY downstream_eid",
375    )
376    .bind(part)
377    .bind(flow_uuid)
378    .fetch_all(pool)
379    .await
380    .map_err(map_sqlx_error)?;
381
382    let mut edge_groups = Vec::with_capacity(eg_rows.len());
383    for row in &eg_rows {
384        edge_groups.push(decode_edge_group_row(row)?);
385    }
386
387    decode_flow_row(id.clone(), &flow_row, edge_groups).map(Some)
388}
389
390/// `list_flows` — cursor-paginated per-partition flow listing.
391pub async fn list_flows(
392    pool: &PgPool,
393    partition: PartitionKey,
394    cursor: Option<FlowId>,
395    limit: usize,
396) -> Result<ListFlowsPage, EngineError> {
397    if limit == 0 {
398        return Ok(ListFlowsPage::new(Vec::new(), None));
399    }
400    let part = partition_index_from_key(&partition)?;
401    let cursor_uuid: Option<Uuid> = cursor.as_ref().map(|f| f.0);
402    // +1 row to decide whether next_cursor should be set.
403    let fetch_limit = (limit + 1) as i64;
404
405    let rows = sqlx::query(
406        "SELECT flow_id, created_at_ms, public_flow_state \
407         FROM ff_flow_core \
408         WHERE partition_key = $1 \
409           AND ($2::uuid IS NULL OR flow_id > $2) \
410         ORDER BY flow_id \
411         LIMIT $3",
412    )
413    .bind(part)
414    .bind(cursor_uuid)
415    .bind(fetch_limit)
416    .fetch_all(pool)
417    .await
418    .map_err(map_sqlx_error)?;
419
420    let mut flows: Vec<FlowSummary> = Vec::with_capacity(rows.len().min(limit));
421    let mut next_cursor: Option<FlowId> = None;
422    for (idx, row) in rows.iter().enumerate() {
423        if idx >= limit {
424            // The (limit+1)-th row signals there is at least one more;
425            // we don't include it in the page, but the last included
426            // row's flow_id becomes the cursor.
427            if let Some(last) = flows.last() {
428                next_cursor = Some(last.flow_id.clone());
429            }
430            break;
431        }
432        let flow_uuid: Uuid = row.get("flow_id");
433        let created_at_ms: i64 = row.get("created_at_ms");
434        let public_state: String = row.get("public_flow_state");
435        let status = FlowStatus::from_public_flow_state(&public_state);
436        flows.push(FlowSummary::new(
437            FlowId::from_uuid(flow_uuid),
438            TimestampMs(created_at_ms),
439            status,
440        ));
441    }
442
443    Ok(ListFlowsPage::new(flows, next_cursor))
444}
445
446/// `list_edges` — direction-keyed adjacency listing.
447pub async fn list_edges(
448    pool: &PgPool,
449    partition_config: &ff_core::partition::PartitionConfig,
450    flow_id: &FlowId,
451    direction: EdgeDirection,
452) -> Result<Vec<EdgeSnapshot>, EngineError> {
453    let part = flow_partition_byte(flow_id, partition_config);
454    let flow_uuid: Uuid = flow_id.0;
455    let (column_filter, subject_eid) = match &direction {
456        EdgeDirection::Outgoing { from_node } => ("upstream_eid", from_node),
457        EdgeDirection::Incoming { to_node } => ("downstream_eid", to_node),
458    };
459    // Parse the UUID suffix from the `{fp:N}:<uuid>` id.
460    let subject_uuid = parse_exec_uuid(subject_eid)?;
461
462    let sql = format!(
463        "SELECT partition_key, flow_id, edge_id, upstream_eid, downstream_eid, policy \
464         FROM ff_edge \
465         WHERE partition_key = $1 AND flow_id = $2 AND {column_filter} = $3 \
466         ORDER BY edge_id"
467    );
468    let rows = sqlx::query(&sql)
469        .bind(part)
470        .bind(flow_uuid)
471        .bind(subject_uuid)
472        .fetch_all(pool)
473        .await
474        .map_err(map_sqlx_error)?;
475
476    let mut out = Vec::with_capacity(rows.len());
477    for row in &rows {
478        out.push(decode_edge_row(row, flow_id)?);
479    }
480    Ok(out)
481}
482
483/// `describe_edge` — one edge by flow + edge id.
484pub async fn describe_edge(
485    pool: &PgPool,
486    partition_config: &ff_core::partition::PartitionConfig,
487    flow_id: &FlowId,
488    edge_id: &EdgeId,
489) -> Result<Option<EdgeSnapshot>, EngineError> {
490    let part = flow_partition_byte(flow_id, partition_config);
491    let row_opt = sqlx::query(
492        "SELECT partition_key, flow_id, edge_id, upstream_eid, downstream_eid, policy \
493         FROM ff_edge \
494         WHERE partition_key = $1 AND flow_id = $2 AND edge_id = $3",
495    )
496    .bind(part)
497    .bind(flow_id.0)
498    .bind(edge_id.0)
499    .fetch_optional(pool)
500    .await
501    .map_err(map_sqlx_error)?;
502
503    match row_opt {
504        Some(row) => decode_edge_row(&row, flow_id).map(Some),
505        None => Ok(None),
506    }
507}
508
509/// Parse the UUID suffix out of a hash-tagged `{fp:N}:<uuid>` exec id.
510fn parse_exec_uuid(eid: &ExecutionId) -> Result<Uuid, EngineError> {
511    let s = eid.as_str();
512    let Some(colon) = s.rfind("}:") else {
513        return Err(EngineError::Validation {
514            kind: ValidationKind::InvalidInput,
515            detail: format!("execution_id missing '}}:' delimiter: {s}"),
516        });
517    };
518    let tail = &s[colon + 2..];
519    Uuid::parse_str(tail).map_err(|e| EngineError::Validation {
520        kind: ValidationKind::InvalidInput,
521        detail: format!("execution_id uuid suffix parse: {e}"),
522    })
523}
524
525fn cancel_policy_to_str(p: CancelFlowPolicy) -> &'static str {
526    match p {
527        CancelFlowPolicy::FlowOnly => "cancel_flow_only",
528        CancelFlowPolicy::CancelAll => "cancel_all",
529        CancelFlowPolicy::CancelPending => "cancel_pending",
530        // Forward-compat for future additive `CancelFlowPolicy` variants
531        // — encode as `cancel_all` (safest conservative default).
532        _ => "cancel_all",
533    }
534}
535
536/// `cancel_flow` — SERIALIZABLE cascade with a 3-attempt retry loop.
537///
538/// Transaction steps:
539///   1. SELECT + UPDATE `ff_flow_core.public_flow_state = 'cancelled'`
540///      (atomic state flip).
541///   2. For `CancelAll`/`CancelPending`: SELECT member execution ids
542///      from `ff_exec_core` filtered by flow_id and state, UPDATE
543///      each to `cancelled`, INSERT a `ff_completion_event` row per
544///      member (the trigger fires NOTIFY at COMMIT).
545///   3. For `CancelPending` under `CancelRemaining` semantics (Stage
546///      C), INSERT a `ff_pending_cancel_groups` bookkeeping row for
547///      every running sibling group that the Wave-5 dispatcher needs
548///      to follow up on. Wave 4c only writes the bookkeeping row —
549///      the actual dispatch is Wave 5.
550///
551/// On `SQLSTATE 40001` / `40P01` (serialization / deadlock) the
552/// transaction ROLLBACKs, we sleep a tiny backoff, and retry. After
553/// [`CANCEL_FLOW_MAX_ATTEMPTS`] failed attempts we return
554/// [`ContentionKind::RetryExhausted`].
555pub async fn cancel_flow(
556    pool: &PgPool,
557    partition_config: &ff_core::partition::PartitionConfig,
558    id: &FlowId,
559    policy: CancelFlowPolicy,
560) -> Result<CancelFlowResult, EngineError> {
561    let part = flow_partition_byte(id, partition_config);
562    let flow_uuid: Uuid = id.0;
563    let policy_str = cancel_policy_to_str(policy);
564
565    let mut last_transport: Option<EngineError> = None;
566    for attempt in 0..CANCEL_FLOW_MAX_ATTEMPTS {
567        match cancel_flow_once(pool, part, flow_uuid, policy, policy_str).await {
568            Ok(result) => return Ok(result),
569            Err(err) => {
570                if is_serialization_conflict(&err) {
571                    // Tiny exponential backoff: 0, 5ms, 15ms.
572                    if attempt + 1 < CANCEL_FLOW_MAX_ATTEMPTS {
573                        let ms = 5u64 * (1u64 << attempt).saturating_sub(0);
574                        tokio::time::sleep(Duration::from_millis(ms)).await;
575                    }
576                    last_transport = Some(err);
577                    continue;
578                }
579                return Err(err);
580            }
581        }
582    }
583    // Exhausted all attempts on serialization failures.
584    let _ = last_transport; // drop; retained only for trace-level diagnosis
585    Err(EngineError::Contention(ContentionKind::RetryExhausted))
586}
587
588/// Return `true` when an `EngineError` was produced by the 40001 /
589/// 40P01 sqlx mapping (Contention(LeaseConflict)). We re-map that
590/// specific kind into the retry loop; other `Contention` variants
591/// (e.g. honest lease contention from a future extension) propagate
592/// untouched.
593fn is_serialization_conflict(err: &EngineError) -> bool {
594    matches!(
595        err,
596        EngineError::Contention(ContentionKind::LeaseConflict)
597    )
598}
599
600async fn cancel_flow_once(
601    pool: &PgPool,
602    part: i16,
603    flow_uuid: Uuid,
604    policy: CancelFlowPolicy,
605    policy_str: &str,
606) -> Result<CancelFlowResult, EngineError> {
607    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
608
609    sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
610        .execute(&mut *tx)
611        .await
612        .map_err(map_sqlx_error)?;
613
614    // Step 1 — atomically flip the flow_core state. If the flow
615    // doesn't exist we return `Cancelled { members: [] }` with empty
616    // members (idempotent retry after the row has already been
617    // cancelled returns the same shape).
618    let flow_found = sqlx::query(
619        "UPDATE ff_flow_core \
620         SET public_flow_state = 'cancelled', \
621             terminal_at_ms = COALESCE(terminal_at_ms, \
622                 (extract(epoch from clock_timestamp())*1000)::bigint), \
623             raw_fields = raw_fields \
624                 || jsonb_build_object('cancellation_policy', $3::text) \
625         WHERE partition_key = $1 AND flow_id = $2 \
626         RETURNING flow_id",
627    )
628    .bind(part)
629    .bind(flow_uuid)
630    .bind(policy_str)
631    .fetch_optional(&mut *tx)
632    .await
633    .map_err(map_sqlx_error)?;
634
635    if flow_found.is_none() {
636        tx.commit().await.map_err(map_sqlx_error)?;
637        return Ok(CancelFlowResult::Cancelled {
638            cancellation_policy: policy_str.to_owned(),
639            member_execution_ids: Vec::new(),
640        });
641    }
642
643    // Step 2 — collect + cancel member executions under the policy.
644    let member_rows = if matches!(policy, CancelFlowPolicy::FlowOnly) {
645        Vec::new()
646    } else {
647        let state_filter: &str = match policy {
648            CancelFlowPolicy::CancelAll => {
649                "lifecycle_phase NOT IN ('completed','failed','cancelled','expired')"
650            }
651            CancelFlowPolicy::CancelPending => {
652                "lifecycle_phase IN ('pending','blocked','eligible')"
653            }
654            _ => "lifecycle_phase NOT IN ('completed','failed','cancelled','expired')",
655        };
656        let sql = format!(
657            "SELECT execution_id FROM ff_exec_core \
658             WHERE partition_key = $1 AND flow_id = $2 AND {state_filter}"
659        );
660        sqlx::query(&sql)
661            .bind(part)
662            .bind(flow_uuid)
663            .fetch_all(&mut *tx)
664            .await
665            .map_err(map_sqlx_error)?
666    };
667
668    let mut member_execution_ids: Vec<String> = Vec::with_capacity(member_rows.len());
669    for row in &member_rows {
670        let exec_uuid: Uuid = row.get("execution_id");
671        // Flip the member to `cancelled` lifecycle + eligibility.
672        sqlx::query(
673            "UPDATE ff_exec_core \
674             SET lifecycle_phase = 'cancelled', \
675                 eligibility_state = 'cancelled', \
676                 public_state = 'cancelled', \
677                 terminal_at_ms = COALESCE(terminal_at_ms, \
678                     (extract(epoch from clock_timestamp())*1000)::bigint), \
679                 cancellation_reason = COALESCE(cancellation_reason, 'flow_cancelled'), \
680                 cancelled_by = COALESCE(cancelled_by, 'cancel_flow') \
681             WHERE partition_key = $1 AND execution_id = $2",
682        )
683        .bind(part)
684        .bind(exec_uuid)
685        .execute(&mut *tx)
686        .await
687        .map_err(map_sqlx_error)?;
688
689        // #355: clear the current attempt's `outcome` so a later
690        // `read_execution_info` doesn't surface a stale
691        // `retry`/`interrupted` terminal-outcome on the cancelled row.
692        // Mirror of the SQLite companion statement in
693        // `ff-backend-sqlite/src/queries/flow.rs`
694        // (`UPDATE_ATTEMPT_CLEAR_OUTCOME_FOR_CURRENT_SQL`).
695        sqlx::query(
696            "UPDATE ff_attempt \
697                SET outcome = NULL \
698              WHERE partition_key = $1 \
699                AND execution_id  = $2 \
700                AND attempt_index = (SELECT attempt_index FROM ff_exec_core \
701                                      WHERE partition_key = $1 AND execution_id = $2)",
702        )
703        .bind(part)
704        .bind(exec_uuid)
705        .execute(&mut *tx)
706        .await
707        .map_err(map_sqlx_error)?;
708
709        // Emit a completion outbox row (trigger fires NOTIFY).
710        sqlx::query(
711            "INSERT INTO ff_completion_event \
712             (partition_key, execution_id, flow_id, outcome, occurred_at_ms) \
713             VALUES ($1, $2, $3, 'cancelled', \
714                     (extract(epoch from clock_timestamp())*1000)::bigint)",
715        )
716        .bind(part)
717        .bind(exec_uuid)
718        .bind(flow_uuid)
719        .execute(&mut *tx)
720        .await
721        .map_err(map_sqlx_error)?;
722
723        // RFC-019 Stage B outbox: lease revoked on every cancelled
724        // member. Members that never held a live lease surface a
725        // no-op revoke to consumers — same shape as the Valkey side.
726        sqlx::query(
727            "INSERT INTO ff_lease_event \
728             (execution_id, lease_id, event_type, occurred_at_ms, partition_key) \
729             VALUES ($1, NULL, 'revoked', \
730                     (extract(epoch from clock_timestamp())*1000)::bigint, $2)",
731        )
732        .bind(exec_uuid.to_string())
733        .bind(i32::from(part))
734        .execute(&mut *tx)
735        .await
736        .map_err(map_sqlx_error)?;
737
738        member_execution_ids.push(format!("{{fp:{part}}}:{exec_uuid}"));
739    }
740
741    // Step 3 — for CancelPending, record any running sibling groups
742    // into ff_pending_cancel_groups so the Wave-5 dispatcher can
743    // follow up. Wave 4c only does the bookkeeping write.
744    if matches!(policy, CancelFlowPolicy::CancelPending) {
745        sqlx::query(
746            "INSERT INTO ff_pending_cancel_groups \
747             (partition_key, flow_id, downstream_eid, enqueued_at_ms) \
748             SELECT partition_key, flow_id, downstream_eid, \
749                    (extract(epoch from clock_timestamp())*1000)::bigint \
750             FROM ff_edge_group \
751             WHERE partition_key = $1 AND flow_id = $2 AND running_count > 0 \
752             ON CONFLICT DO NOTHING",
753        )
754        .bind(part)
755        .bind(flow_uuid)
756        .execute(&mut *tx)
757        .await
758        .map_err(map_sqlx_error)?;
759    }
760
761    tx.commit().await.map_err(map_sqlx_error)?;
762
763    Ok(CancelFlowResult::Cancelled {
764        cancellation_policy: policy_str.to_owned(),
765        member_execution_ids,
766    })
767}
768
769/// `set_edge_group_policy` — RFC-016 Stage A.
770///
771/// Reject when at least one edge has already been staged for
772/// `downstream_execution_id` (the ordering rule: policy must be set
773/// BEFORE the first `add_dependency`). Parity with the Valkey Lua
774/// path, which returns `invalid_input` → `EngineError::Validation`
775/// in the same situation (not `Conflict`, per in-tree behavior).
776///
777/// Stage A supports `AllOf` fully; `AnyOf` / `Quorum` flow through
778/// the same write path (Stage B's resolver is backend-specific —
779/// Valkey's Lua honours them, Postgres Wave 4c stores them and Wave
780/// 5 wires the resolver). Callers that need the Stage-A ordering
781/// check can rely on this method regardless of the variant.
782pub async fn set_edge_group_policy(
783    pool: &PgPool,
784    partition_config: &ff_core::partition::PartitionConfig,
785    flow_id: &FlowId,
786    downstream_execution_id: &ExecutionId,
787    policy: EdgeDependencyPolicy,
788) -> Result<SetEdgeGroupPolicyResult, EngineError> {
789    // Light input validation mirroring the Valkey Stage B impl.
790    match &policy {
791        EdgeDependencyPolicy::AllOf => {}
792        EdgeDependencyPolicy::AnyOf { .. } => {}
793        EdgeDependencyPolicy::Quorum { k, .. } => {
794            if *k == 0 {
795                return Err(EngineError::Validation {
796                    kind: ValidationKind::InvalidInput,
797                    detail: "quorum k must be >= 1".to_string(),
798                });
799            }
800        }
801        _ => {
802            return Err(EngineError::Validation {
803                kind: ValidationKind::InvalidInput,
804                detail: "unknown EdgeDependencyPolicy variant".to_string(),
805            });
806        }
807    }
808
809    let part = flow_partition_byte(flow_id, partition_config);
810    let flow_uuid: Uuid = flow_id.0;
811    let downstream_uuid = parse_exec_uuid(downstream_execution_id)?;
812
813    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
814
815    // Ordering guard: edges already staged for this group?
816    let already_staged: i64 = sqlx::query_scalar(
817        "SELECT COUNT(*) FROM ff_edge \
818         WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
819    )
820    .bind(part)
821    .bind(flow_uuid)
822    .bind(downstream_uuid)
823    .fetch_one(&mut *tx)
824    .await
825    .map_err(map_sqlx_error)?;
826
827    if already_staged > 0 {
828        let _ = tx.rollback().await;
829        return Err(EngineError::Validation {
830            kind: ValidationKind::InvalidInput,
831            detail: "edge_group_policy_already_fixed: dependencies already staged".to_string(),
832        });
833    }
834
835    // Idempotent restate: if an identical policy is already stored,
836    // return AlreadySet without touching the row.
837    let existing: Option<JsonValue> = sqlx::query_scalar(
838        "SELECT policy FROM ff_edge_group \
839         WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
840    )
841    .bind(part)
842    .bind(flow_uuid)
843    .bind(downstream_uuid)
844    .fetch_optional(&mut *tx)
845    .await
846    .map_err(map_sqlx_error)?;
847
848    let encoded = encode_edge_policy(&policy);
849    if let Some(existing_policy) = existing
850        && existing_policy == encoded
851    {
852        tx.commit().await.map_err(map_sqlx_error)?;
853        return Ok(SetEdgeGroupPolicyResult::AlreadySet);
854    }
855
856    sqlx::query(
857        "INSERT INTO ff_edge_group \
858         (partition_key, flow_id, downstream_eid, policy) \
859         VALUES ($1, $2, $3, $4) \
860         ON CONFLICT (partition_key, flow_id, downstream_eid) \
861         DO UPDATE SET policy = EXCLUDED.policy",
862    )
863    .bind(part)
864    .bind(flow_uuid)
865    .bind(downstream_uuid)
866    .bind(&encoded)
867    .execute(&mut *tx)
868    .await
869    .map_err(map_sqlx_error)?;
870
871    tx.commit().await.map_err(map_sqlx_error)?;
872    Ok(SetEdgeGroupPolicyResult::Set)
873}
874
875// Silence an unused-import lint if PartitionFamily/Partition imports
876// aren't otherwise referenced after trimming.
877#[allow(dead_code)]
878fn _unused_imports_anchor(_p: Partition, _f: PartitionFamily) {}
879
880// ─── set_flow_tag / get_flow_tag (issue #433) ─────────────────────
881//
882// Flow tags are stored as *top-level* `raw_fields` keys (not nested
883// under `raw_fields.tags`) — matches the `extract_tags` projection
884// used by `describe_flow`. Valkey's `ff_set_flow_tags` Lua lazy-
885// migrates any pre-58.4 inline fields to the dedicated `:tags` key;
886// PG has no such migration because the on-disk shape has always been
887// top-level `raw_fields` keys.
888
889/// Upsert a single namespaced tag on a flow. Key assumed pre-validated
890/// by `ff_core::engine_backend::validate_tag_key`. Missing flow →
891/// `EngineError::NotFound { entity: "flow" }`.
892pub(super) async fn set_flow_tag_impl(
893    pool: &sqlx::PgPool,
894    partition_config: &ff_core::partition::PartitionConfig,
895    flow_id: &FlowId,
896    key: &str,
897    value: &str,
898) -> Result<(), EngineError> {
899    let part = flow_partition_byte(flow_id, partition_config);
900    let flow_uuid: Uuid = flow_id.0;
901
902    let result = sqlx::query(
903        r#"
904        UPDATE ff_flow_core
905        SET raw_fields = jsonb_set(
906            COALESCE(raw_fields, '{}'::jsonb),
907            ARRAY[$3::text],
908            to_jsonb($4::text),
909            true
910        )
911        WHERE partition_key = $1 AND flow_id = $2
912        "#,
913    )
914    .bind(part)
915    .bind(flow_uuid)
916    .bind(key)
917    .bind(value)
918    .execute(pool)
919    .await
920    .map_err(map_sqlx_error)?;
921
922    if result.rows_affected() == 0 {
923        return Err(EngineError::NotFound { entity: "flow" });
924    }
925    Ok(())
926}
927
928/// Point-read of `raw_fields->><key>` on `ff_flow_core`. `Ok(None)`
929/// covers missing-tag and missing-flow alike — matches Valkey's
930/// `HGET` collapse (see `EngineBackend::get_flow_tag` rustdoc).
931pub(super) async fn get_flow_tag_impl(
932    pool: &sqlx::PgPool,
933    partition_config: &ff_core::partition::PartitionConfig,
934    flow_id: &FlowId,
935    key: &str,
936) -> Result<Option<String>, EngineError> {
937    let part = flow_partition_byte(flow_id, partition_config);
938    let flow_uuid: Uuid = flow_id.0;
939
940    let row: Option<(Option<String>,)> = sqlx::query_as(
941        r#"
942        SELECT raw_fields->>$3
943        FROM ff_flow_core
944        WHERE partition_key = $1 AND flow_id = $2
945        "#,
946    )
947    .bind(part)
948    .bind(flow_uuid)
949    .bind(key)
950    .fetch_optional(pool)
951    .await
952    .map_err(map_sqlx_error)?;
953
954    Ok(row.and_then(|(tag,)| tag))
955}
956
957// ── PR-7b Cluster 2b-B — flow summary projection ─────────────────────
958//
959// One SQL round-trip aggregates member `public_state` counts from
960// `ff_exec_core` (partition-local via RFC-011 exec-flow co-location),
961// a second round-trip UPSERTs into `ff_flow_summary` (migration 0019).
962// Distinct from `ff_flow_core.public_flow_state`: the summary
963// `public_flow_state` is a DERIVED rollup, not the authoritative
964// mutation-guard state. See RFC-007 §Flow Summary Projection.
965
966/// Derive + UPSERT the flow summary row. Returns `Ok(true)` when the
967/// summary was written, `Ok(false)` when the flow has no members yet
968/// (nothing to project).
969pub(super) async fn project_flow_summary_impl(
970    pool: &PgPool,
971    partition_key: i16,
972    flow_uuid: Uuid,
973    now_ms: i64,
974) -> Result<bool, EngineError> {
975    // Aggregate per-state member counts for this flow. `partition_key`
976    // + `flow_id` hits `ff_exec_core_flow_idx`; no cross-partition
977    // fan-out. Every state bucket consumed by the summary projection
978    // is enumerated as a separate COUNT so the resulting row is
979    // unambiguous even when `public_state` is a state not known to
980    // this scanner version.
981    let row = sqlx::query_as::<_, (
982        i64, i64, i64, i64, i64, i64, i64, i64, i64, i64, i64, i64,
983    )>(
984        r#"
985        SELECT
986            COUNT(*) FILTER (WHERE public_state = 'completed'),
987            COUNT(*) FILTER (WHERE public_state = 'failed'),
988            COUNT(*) FILTER (WHERE public_state = 'cancelled'),
989            COUNT(*) FILTER (WHERE public_state = 'expired'),
990            COUNT(*) FILTER (WHERE public_state = 'skipped'),
991            COUNT(*) FILTER (WHERE public_state = 'active'),
992            COUNT(*) FILTER (WHERE public_state = 'suspended'),
993            COUNT(*) FILTER (WHERE public_state = 'waiting'),
994            COUNT(*) FILTER (WHERE public_state = 'delayed'),
995            COUNT(*) FILTER (WHERE public_state = 'rate_limited'),
996            COUNT(*) FILTER (WHERE public_state = 'waiting_children'),
997            COUNT(*)
998        FROM ff_exec_core
999        WHERE partition_key = $1 AND flow_id = $2
1000        "#,
1001    )
1002    .bind(partition_key)
1003    .bind(flow_uuid)
1004    .fetch_one(pool)
1005    .await
1006    .map_err(map_sqlx_error)?;
1007
1008    let (completed, failed, cancelled, expired, skipped, active, suspended,
1009         waiting, delayed, rate_limited, waiting_children, total_members) = row;
1010
1011    if total_members == 0 {
1012        return Ok(false);
1013    }
1014
1015    // PG aggregates every member in one SELECT so `sampled == total`
1016    // by construction — the Valkey "sample-based" shape is a
1017    // constant-memory workaround we don't need on SQL.
1018    let sampled_i32 = i32::try_from(total_members).unwrap_or(i32::MAX);
1019    let terminal_count = completed + failed + cancelled + expired + skipped;
1020    let all_terminal = terminal_count == total_members;
1021
1022    let flow_state: &str = if all_terminal {
1023        if failed > 0 || cancelled > 0 || expired > 0 {
1024            "failed"
1025        } else {
1026            "completed"
1027        }
1028    } else if active > 0 {
1029        "running"
1030    } else if suspended > 0 || delayed > 0 || rate_limited > 0 || waiting_children > 0 {
1031        "blocked"
1032    } else {
1033        "open"
1034    };
1035
1036    // INSERT ... ON CONFLICT DO UPDATE — idempotent re-projection.
1037    sqlx::query(
1038        r#"
1039        INSERT INTO ff_flow_summary (
1040            partition_key, flow_id,
1041            total_members, sampled_members,
1042            members_completed, members_failed, members_cancelled,
1043            members_expired, members_skipped, members_active,
1044            members_suspended, members_waiting, members_delayed,
1045            members_rate_limited, members_waiting_children,
1046            public_flow_state, last_summary_update_at
1047        ) VALUES (
1048            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
1049            $11, $12, $13, $14, $15, $16, $17
1050        )
1051        ON CONFLICT (partition_key, flow_id) DO UPDATE SET
1052            total_members = EXCLUDED.total_members,
1053            sampled_members = EXCLUDED.sampled_members,
1054            members_completed = EXCLUDED.members_completed,
1055            members_failed = EXCLUDED.members_failed,
1056            members_cancelled = EXCLUDED.members_cancelled,
1057            members_expired = EXCLUDED.members_expired,
1058            members_skipped = EXCLUDED.members_skipped,
1059            members_active = EXCLUDED.members_active,
1060            members_suspended = EXCLUDED.members_suspended,
1061            members_waiting = EXCLUDED.members_waiting,
1062            members_delayed = EXCLUDED.members_delayed,
1063            members_rate_limited = EXCLUDED.members_rate_limited,
1064            members_waiting_children = EXCLUDED.members_waiting_children,
1065            public_flow_state = EXCLUDED.public_flow_state,
1066            last_summary_update_at = EXCLUDED.last_summary_update_at
1067        "#,
1068    )
1069    .bind(partition_key)
1070    .bind(flow_uuid)
1071    .bind(total_members)
1072    .bind(sampled_i32)
1073    .bind(i32::try_from(completed).unwrap_or(i32::MAX))
1074    .bind(i32::try_from(failed).unwrap_or(i32::MAX))
1075    .bind(i32::try_from(cancelled).unwrap_or(i32::MAX))
1076    .bind(i32::try_from(expired).unwrap_or(i32::MAX))
1077    .bind(i32::try_from(skipped).unwrap_or(i32::MAX))
1078    .bind(i32::try_from(active).unwrap_or(i32::MAX))
1079    .bind(i32::try_from(suspended).unwrap_or(i32::MAX))
1080    .bind(i32::try_from(waiting).unwrap_or(i32::MAX))
1081    .bind(i32::try_from(delayed).unwrap_or(i32::MAX))
1082    .bind(i32::try_from(rate_limited).unwrap_or(i32::MAX))
1083    .bind(i32::try_from(waiting_children).unwrap_or(i32::MAX))
1084    .bind(flow_state)
1085    .bind(now_ms)
1086    .execute(pool)
1087    .await
1088    .map_err(map_sqlx_error)?;
1089
1090    Ok(true)
1091}